From 179f871b897ff57f4c3e428e638a1b62b6edef75 Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Mon, 26 Oct 2020 14:18:19 -0400 Subject: [PATCH 1/6] ARROW-10080: [R] Call gc() and try again in MemoryPool --- cpp/src/arrow/memory_pool.cc | 4 ---- cpp/src/arrow/memory_pool.h | 4 ++-- r/man/enums.Rd | 2 +- r/src/memorypool.cpp | 39 ++++++++++++++++++++++++++++++++++-- 4 files changed, 40 insertions(+), 9 deletions(-) diff --git a/cpp/src/arrow/memory_pool.cc b/cpp/src/arrow/memory_pool.cc index 787c01b153e..c0ff19c4747 100644 --- a/cpp/src/arrow/memory_pool.cc +++ b/cpp/src/arrow/memory_pool.cc @@ -264,10 +264,6 @@ class MimallocAllocator { } // namespace -MemoryPool::MemoryPool() {} - -MemoryPool::~MemoryPool() {} - int64_t MemoryPool::max_memory() const { return -1; } /////////////////////////////////////////////////////////////////////// diff --git a/cpp/src/arrow/memory_pool.h b/cpp/src/arrow/memory_pool.h index b875e54b440..71a39b9cb25 100644 --- a/cpp/src/arrow/memory_pool.h +++ b/cpp/src/arrow/memory_pool.h @@ -63,7 +63,7 @@ class MemoryPoolStats { /// take care of the required 64-byte alignment. class ARROW_EXPORT MemoryPool { public: - virtual ~MemoryPool(); + virtual ~MemoryPool() = default; /// \brief EXPERIMENTAL. Create a new instance of the default MemoryPool static std::unique_ptr CreateDefault(); @@ -101,7 +101,7 @@ class ARROW_EXPORT MemoryPool { virtual std::string backend_name() const = 0; protected: - MemoryPool(); + MemoryPool() = default; }; class ARROW_EXPORT LoggingMemoryPool : public MemoryPool { diff --git a/r/man/enums.Rd b/r/man/enums.Rd index 67890fbb4a5..e4cb2d85469 100644 --- a/r/man/enums.Rd +++ b/r/man/enums.Rd @@ -19,7 +19,7 @@ An object of class \code{TimeUnit::type} (inherits from \code{arrow-enum}) of le An object of class \code{DateUnit} (inherits from \code{arrow-enum}) of length 2. -An object of class \code{Type::type} (inherits from \code{arrow-enum}) of length 36. +An object of class \code{Type::type} (inherits from \code{arrow-enum}) of length 37. An object of class \code{StatusCode} (inherits from \code{arrow-enum}) of length 17. diff --git a/r/src/memorypool.cpp b/r/src/memorypool.cpp index bc2f576d76e..fc25e7c331c 100644 --- a/r/src/memorypool.cpp +++ b/r/src/memorypool.cpp @@ -19,10 +19,45 @@ #if defined(ARROW_R_WITH_ARROW) #include +class GcMemoryPool : public arrow::MemoryPool { + public: + GcMemoryPool() + : pool_(arrow::default_memory_pool()), gc_(cpp11::package("base")["gc"]) {} + + arrow::Status Allocate(int64_t size, uint8_t** out) override { + return GcAndTryAgain([&] { return pool_->Allocate(size, out); }); + } + + arrow::Status Reallocate(int64_t old_size, int64_t new_size, uint8_t** ptr) override { + return GcAndTryAgain([&] { return pool_->Reallocate(old_size, new_size, ptr); }); + } + + void Free(uint8_t* buffer, int64_t size) override { pool_->Free(buffer, size); } + + int64_t bytes_allocated() const override { return pool_->bytes_allocated(); } + + int64_t max_memory() const override { return pool_->max_memory(); } + + std::string backend_name() const override { return pool_->backend_name() + "-gc"; } + + private: + template + arrow::Status GcAndTryAgain(const Call& call) { + if (call().ok()) { + return arrow::Status::OK(); + } + // ARROW-10080 + gc_(); + return call(); + } + + arrow::MemoryPool* pool_; + cpp11::function gc_; +}; + // [[arrow::export]] std::shared_ptr MemoryPool__default() { - return std::shared_ptr(arrow::default_memory_pool(), - [](arrow::MemoryPool* not_deleted) {}); + return std::make_shared(); } // [[arrow::export]] From 0ef1462688698d9785899f4b388cd3d348021563 Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Tue, 27 Oct 2020 09:10:02 -0400 Subject: [PATCH 2/6] replace usages of default_memory_pool() with gc_memory_pool(), add test --- cpp/src/arrow/io/memory.cc | 1 + r/src/array_from_vector.cpp | 6 +++--- r/src/array_to_vector.cpp | 6 +++--- r/src/arrow_types.h | 1 + r/src/compute.cpp | 4 +++- r/src/csv.cpp | 2 +- r/src/dataset.cpp | 4 +++- r/src/io.cpp | 4 ++-- r/src/json.cpp | 2 +- r/src/memorypool.cpp | 21 +++++++++++++++------ r/src/parquet.cpp | 7 +++---- r/tests/testthat/test-arrow.R | 14 ++++++++++++++ r/tests/testthat/test-dataset.R | 4 ++-- 13 files changed, 52 insertions(+), 24 deletions(-) diff --git a/cpp/src/arrow/io/memory.cc b/cpp/src/arrow/io/memory.cc index 361895494e1..1cde5e64e01 100644 --- a/cpp/src/arrow/io/memory.cc +++ b/cpp/src/arrow/io/memory.cc @@ -54,6 +54,7 @@ BufferOutputStream::BufferOutputStream(const std::shared_ptr& b Result> BufferOutputStream::Create( int64_t initial_capacity, MemoryPool* pool) { // ctor is private, so cannot use make_shared + DCHECK_NE(pool, nullptr); auto ptr = std::shared_ptr(new BufferOutputStream); RETURN_NOT_OK(ptr->Reset(initial_capacity, pool)); return ptr; diff --git a/r/src/array_from_vector.cpp b/r/src/array_from_vector.cpp index ed3df89b242..148229e33ca 100644 --- a/r/src/array_from_vector.cpp +++ b/r/src/array_from_vector.cpp @@ -321,7 +321,7 @@ struct VectorToArrayConverter { static std::shared_ptr Visit(SEXP x, const std::shared_ptr& type) { std::unique_ptr builder; - StopIfNotOk(MakeBuilder(arrow::default_memory_pool(), type, &builder)); + StopIfNotOk(MakeBuilder(gc_memory_pool(), type, &builder)); VectorToArrayConverter converter{x, builder.get()}; StopIfNotOk(arrow::VisitTypeInline(*type, &converter)); @@ -1512,7 +1512,7 @@ std::shared_ptr Array__from_vector( // Create ArrayBuilder for type std::unique_ptr type_builder; - StopIfNotOk(arrow::MakeBuilder(arrow::default_memory_pool(), type, &type_builder)); + StopIfNotOk(arrow::MakeBuilder(gc_memory_pool(), type, &type_builder)); StopIfNotOk(converter->Init(type_builder.get())); // ingest R data and grab the result array @@ -1568,7 +1568,7 @@ std::shared_ptr ChunkedArray__from_list(cpp11::list chunks, if (n == 0) { std::shared_ptr array; std::unique_ptr type_builder; - StopIfNotOk(arrow::MakeBuilder(arrow::default_memory_pool(), type, &type_builder)); + StopIfNotOk(arrow::MakeBuilder(gc_memory_pool(), type, &type_builder)); StopIfNotOk(type_builder->Finish(&array)); vec.push_back(array); } else { diff --git a/r/src/array_to_vector.cpp b/r/src/array_to_vector.cpp index 51f0e3308ef..6c8a0bd5334 100644 --- a/r/src/array_to_vector.cpp +++ b/r/src/array_to_vector.cpp @@ -643,7 +643,7 @@ class Converter_Struct : public Converter { auto struct_array = checked_cast(array.get()); int nf = converters.size(); // Flatten() deals with merging of nulls - auto arrays = ValueOrStop(struct_array->Flatten(default_memory_pool())); + auto arrays = ValueOrStop(struct_array->Flatten(gc_memory_pool())); for (int i = 0; i < nf; i++) { StopIfNotOk(converters[i]->Ingest_some_nulls(VECTOR_ELT(data, i), arrays[i], start, n, chunk_index)); @@ -818,7 +818,7 @@ class Converter_List : public Converter { // Build an empty array to match value_type std::unique_ptr builder; - StopIfNotOk(arrow::MakeBuilder(arrow::default_memory_pool(), value_type_, &builder)); + StopIfNotOk(arrow::MakeBuilder(gc_memory_pool(), value_type_, &builder)); std::shared_ptr array; StopIfNotOk(builder->Finish(&array)); @@ -869,7 +869,7 @@ class Converter_FixedSizeList : public Converter { // Build an empty array to match value_type std::unique_ptr builder; - StopIfNotOk(arrow::MakeBuilder(arrow::default_memory_pool(), value_type_, &builder)); + StopIfNotOk(arrow::MakeBuilder(gc_memory_pool(), value_type_, &builder)); std::shared_ptr array; StopIfNotOk(builder->Finish(&array)); diff --git a/r/src/arrow_types.h b/r/src/arrow_types.h index 6e664ff5e5a..765e38eb568 100644 --- a/r/src/arrow_types.h +++ b/r/src/arrow_types.h @@ -37,6 +37,7 @@ SEXP ChunkedArray__as_vector(const std::shared_ptr& chunked SEXP Array__as_vector(const std::shared_ptr& array); std::shared_ptr Array__from_vector(SEXP x, SEXP type); std::shared_ptr RecordBatch__from_arrays(SEXP, SEXP); +arrow::MemoryPool* gc_memory_pool(); namespace arrow { diff --git a/r/src/compute.cpp b/r/src/compute.cpp index e2ae9a0fdc6..4920cc957f2 100644 --- a/r/src/compute.cpp +++ b/r/src/compute.cpp @@ -179,7 +179,9 @@ std::shared_ptr make_compute_options( SEXP compute__CallFunction(std::string func_name, cpp11::list args, cpp11::list options) { auto opts = make_compute_options(func_name, options); auto datum_args = arrow::r::from_r_list(args); - auto out = ValueOrStop(arrow::compute::CallFunction(func_name, datum_args, opts.get())); + arrow::compute::ExecContext context(gc_memory_pool()); + auto out = ValueOrStop( + arrow::compute::CallFunction(func_name, datum_args, opts.get(), &context)); return from_datum(out); } diff --git a/r/src/csv.cpp b/r/src/csv.cpp index efa646266a0..dea14751026 100644 --- a/r/src/csv.cpp +++ b/r/src/csv.cpp @@ -141,7 +141,7 @@ std::shared_ptr csv___TableReader__Make( const std::shared_ptr& read_options, const std::shared_ptr& parse_options, const std::shared_ptr& convert_options) { - return ValueOrStop(arrow::csv::TableReader::Make(arrow::default_memory_pool(), input, + return ValueOrStop(arrow::csv::TableReader::Make(gc_memory_pool(), input, *read_options, *parse_options, *convert_options)); } diff --git a/r/src/dataset.cpp b/r/src/dataset.cpp index 8a88ab02f87..4327b80f187 100644 --- a/r/src/dataset.cpp +++ b/r/src/dataset.cpp @@ -33,7 +33,9 @@ namespace fs = ::arrow::fs; // [[arrow::export]] std::shared_ptr dataset___Dataset__NewScan( const std::shared_ptr& ds) { - return ValueOrStop(ds->NewScan()); + auto context = std::make_shared(); + context->pool = gc_memory_pool(); + return ValueOrStop(ds->NewScan(std::move(context))); } // [[arrow::export]] diff --git a/r/src/io.cpp b/r/src/io.cpp index 8cfbea02149..90e7e87b214 100644 --- a/r/src/io.cpp +++ b/r/src/io.cpp @@ -150,8 +150,8 @@ std::shared_ptr io___FileOutputStream__Open( // [[arrow::export]] std::shared_ptr io___BufferOutputStream__Create( int64_t initial_capacity) { - return ValueOrStop(arrow::io::BufferOutputStream::Create(initial_capacity, - arrow::default_memory_pool())); + return ValueOrStop( + arrow::io::BufferOutputStream::Create(initial_capacity, gc_memory_pool())); } // [[arrow::export]] diff --git a/r/src/json.cpp b/r/src/json.cpp index 52e50373777..87d40623f6b 100644 --- a/r/src/json.cpp +++ b/r/src/json.cpp @@ -44,7 +44,7 @@ std::shared_ptr json___TableReader__Make( const std::shared_ptr& input, const std::shared_ptr& read_options, const std::shared_ptr& parse_options) { - return ValueOrStop(arrow::json::TableReader::Make(arrow::default_memory_pool(), input, + return ValueOrStop(arrow::json::TableReader::Make(gc_memory_pool(), input, *read_options, *parse_options)); } diff --git a/r/src/memorypool.cpp b/r/src/memorypool.cpp index fc25e7c331c..05b79dc3929 100644 --- a/r/src/memorypool.cpp +++ b/r/src/memorypool.cpp @@ -18,11 +18,11 @@ #include "./arrow_types.h" #if defined(ARROW_R_WITH_ARROW) #include +#include class GcMemoryPool : public arrow::MemoryPool { public: - GcMemoryPool() - : pool_(arrow::default_memory_pool()), gc_(cpp11::package("base")["gc"]) {} + GcMemoryPool() : pool_(arrow::default_memory_pool()) {} arrow::Status Allocate(int64_t size, uint8_t** out) override { return GcAndTryAgain([&] { return pool_->Allocate(size, out); }); @@ -45,19 +45,28 @@ class GcMemoryPool : public arrow::MemoryPool { arrow::Status GcAndTryAgain(const Call& call) { if (call().ok()) { return arrow::Status::OK(); + } else { + auto lock = mutex_.Lock(); + + // ARROW-10080: Allocation may fail spuriously since the garbage collector is lazy. + // Force it to run then try again in case any reusable allocations have been freed. + static cpp11::function gc = cpp11::package("base")["gc"]; + gc(); } - // ARROW-10080 - gc_(); return call(); } + arrow::util::Mutex mutex_; arrow::MemoryPool* pool_; - cpp11::function gc_; }; +static GcMemoryPool g_pool; + +arrow::MemoryPool* gc_memory_pool() { return &g_pool; } + // [[arrow::export]] std::shared_ptr MemoryPool__default() { - return std::make_shared(); + return std::shared_ptr(&g_pool, [](...) {}); } // [[arrow::export]] diff --git a/r/src/parquet.cpp b/r/src/parquet.cpp index 3ea492acda1..a1147acb3e2 100644 --- a/r/src/parquet.cpp +++ b/r/src/parquet.cpp @@ -286,9 +286,8 @@ std::shared_ptr parquet___arrow___ParquetFileWriter_ const std::shared_ptr& properties, const std::shared_ptr& arrow_properties) { std::unique_ptr writer; - PARQUET_THROW_NOT_OK( - parquet::arrow::FileWriter::Open(*schema, arrow::default_memory_pool(), sink, - properties, arrow_properties, &writer)); + PARQUET_THROW_NOT_OK(parquet::arrow::FileWriter::Open( + *schema, gc_memory_pool(), sink, properties, arrow_properties, &writer)); return std::move(writer); } @@ -311,7 +310,7 @@ void parquet___arrow___WriteTable( const std::shared_ptr& sink, const std::shared_ptr& properties, const std::shared_ptr& arrow_properties) { - PARQUET_THROW_NOT_OK(parquet::arrow::WriteTable(*table, arrow::default_memory_pool(), + PARQUET_THROW_NOT_OK(parquet::arrow::WriteTable(*table, gc_memory_pool(), sink, table->num_rows(), properties, arrow_properties)); } diff --git a/r/tests/testthat/test-arrow.R b/r/tests/testthat/test-arrow.R index f1b70e478c8..91720ed9b7b 100644 --- a/r/tests/testthat/test-arrow.R +++ b/r/tests/testthat/test-arrow.R @@ -60,3 +60,17 @@ test_that("arrow gracefully fails to load objects from other sessions (ARROW-100 test_that("check for an ArrowObject in functions use std::shared_ptr", { expect_error(Array__length(1), "Invalid R object") }) + +test_that("MemoryPool calls gc() to free memory when allocation fails (ARROW-10080)", { + env <- new.env() + trace(gc, print = FALSE, tracer = function() { + env$gc_was_called <- TRUE + }) + expect_error( + BufferOutputStream$create(2 ** 60), + "Out of memory" + ) + expect_true(env$gc_was_called) + + untrace(gc) +}) diff --git a/r/tests/testthat/test-dataset.R b/r/tests/testthat/test-dataset.R index d1904fc1060..73d654eb5a1 100644 --- a/r/tests/testthat/test-dataset.R +++ b/r/tests/testthat/test-dataset.R @@ -122,13 +122,13 @@ test_that("Simple interface for datasets", { ) }) -test_that("dim method returns the correct number of rows and columns",{ +test_that("dim method returns the correct number of rows and columns", { ds <- open_dataset(dataset_dir, partitioning = schema(part = uint8())) expect_identical(dim(ds), c(20L, 7L)) }) -test_that("dim() correctly determine numbers of rows and columns on arrow_dplyr_query object",{ +test_that("dim() correctly determine numbers of rows and columns on arrow_dplyr_query object", { ds <- open_dataset(dataset_dir, partitioning = schema(part = uint8())) expect_warning( From 2672ec0b60222d797e5abeaea3127434bcb0faea Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Tue, 27 Oct 2020 10:51:44 -0400 Subject: [PATCH 3/6] lint fix --- r/src/csv.cpp | 5 ++--- r/src/parquet.cpp | 5 ++--- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/r/src/csv.cpp b/r/src/csv.cpp index dea14751026..54d3abc3821 100644 --- a/r/src/csv.cpp +++ b/r/src/csv.cpp @@ -141,9 +141,8 @@ std::shared_ptr csv___TableReader__Make( const std::shared_ptr& read_options, const std::shared_ptr& parse_options, const std::shared_ptr& convert_options) { - return ValueOrStop(arrow::csv::TableReader::Make(gc_memory_pool(), input, - *read_options, *parse_options, - *convert_options)); + return ValueOrStop(arrow::csv::TableReader::Make(gc_memory_pool(), input, *read_options, + *parse_options, *convert_options)); } // [[arrow::export]] diff --git a/r/src/parquet.cpp b/r/src/parquet.cpp index a1147acb3e2..3f6b1113b5d 100644 --- a/r/src/parquet.cpp +++ b/r/src/parquet.cpp @@ -310,9 +310,8 @@ void parquet___arrow___WriteTable( const std::shared_ptr& sink, const std::shared_ptr& properties, const std::shared_ptr& arrow_properties) { - PARQUET_THROW_NOT_OK(parquet::arrow::WriteTable(*table, gc_memory_pool(), - sink, table->num_rows(), properties, - arrow_properties)); + PARQUET_THROW_NOT_OK(parquet::arrow::WriteTable( + *table, gc_memory_pool(), sink, table->num_rows(), properties, arrow_properties)); } // [[arrow::export]] From 929aa88b9cc5eca68a2e7565541a956555c5270d Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Tue, 27 Oct 2020 12:41:38 -0400 Subject: [PATCH 4/6] mop up some implict default_memory_pool()s --- cpp/src/arrow/compute/exec.cc | 3 ++- r/src/array_from_vector.cpp | 5 +++-- r/src/arrow_types.h | 6 ++++-- r/src/compute.cpp | 13 +++++++++---- 4 files changed, 18 insertions(+), 9 deletions(-) diff --git a/cpp/src/arrow/compute/exec.cc b/cpp/src/arrow/compute/exec.cc index 64919f3f3d1..69e788a950d 100644 --- a/cpp/src/arrow/compute/exec.cc +++ b/cpp/src/arrow/compute/exec.cc @@ -583,7 +583,8 @@ class ScalarExecutor : public FunctionExecutorImpl { } else { // XXX: In the case where no outputs are omitted, is returning a 0-length // array always the correct move? - return MakeArrayOfNull(output_descr_.type, /*length=*/0).ValueOrDie(); + return MakeArrayOfNull(output_descr_.type, /*length=*/0, exec_ctx_->memory_pool()) + .ValueOrDie(); } } } diff --git a/r/src/array_from_vector.cpp b/r/src/array_from_vector.cpp index 148229e33ca..1066f1d0876 100644 --- a/r/src/array_from_vector.cpp +++ b/r/src/array_from_vector.cpp @@ -342,7 +342,7 @@ std::shared_ptr MakeFactorArrayImpl(cpp11::integers factor, auto n = factor.size(); std::shared_ptr indices_buffer = - ValueOrStop(AllocateBuffer(n * sizeof(value_type))); + ValueOrStop(AllocateBuffer(n * sizeof(value_type), gc_memory_pool())); std::vector> buffers{nullptr, indices_buffer}; @@ -1367,7 +1367,8 @@ std::shared_ptr MakeSimpleArray(SEXP x) { auto first_na = std::find_if(p_vec_start, p_vec_end, is_na); if (first_na < p_vec_end) { - auto null_bitmap = ValueOrStop(AllocateBuffer(BitUtil::BytesForBits(n))); + auto null_bitmap = + ValueOrStop(AllocateBuffer(BitUtil::BytesForBits(n), gc_memory_pool())); internal::FirstTimeBitmapWriter bitmap_writer(null_bitmap->mutable_data(), 0, n); // first loop to clear all the bits before the first NA diff --git a/r/src/arrow_types.h b/r/src/arrow_types.h index 765e38eb568..a25f0a639d0 100644 --- a/r/src/arrow_types.h +++ b/r/src/arrow_types.h @@ -71,8 +71,10 @@ template class RBuffer : public MutableBuffer { public: explicit RBuffer(RVector vec) - : MutableBuffer(reinterpret_cast(DATAPTR(vec)), - vec.size() * sizeof(typename RVector::value_type)), + : MutableBuffer( + reinterpret_cast(DATAPTR(vec)), + vec.size() * sizeof(typename RVector::value_type), + arrow::CPUMemoryManager::Make(CPUDevice::Instance(), gc_memory_pool())), vec_(vec) {} private: diff --git a/r/src/compute.cpp b/r/src/compute.cpp index 4920cc957f2..3c288c93455 100644 --- a/r/src/compute.cpp +++ b/r/src/compute.cpp @@ -23,6 +23,11 @@ #include #include +arrow::compute::ExecContext* gc_context() { + static arrow::compute::ExecContext context(gc_memory_pool()); + return &context; +} + // [[arrow::export]] std::shared_ptr compute___CastOptions__initialize( bool allow_int_overflow, bool allow_time_truncate, bool allow_float_truncate) { @@ -38,7 +43,7 @@ std::shared_ptr Array__cast( const std::shared_ptr& array, const std::shared_ptr& target_type, const std::shared_ptr& options) { - return ValueOrStop(arrow::compute::Cast(*array, target_type, *options)); + return ValueOrStop(arrow::compute::Cast(*array, target_type, *options, gc_context())); } // [[arrow::export]] @@ -47,7 +52,8 @@ std::shared_ptr ChunkedArray__cast( const std::shared_ptr& target_type, const std::shared_ptr& options) { arrow::Datum value(chunked_array); - arrow::Datum out = ValueOrStop(arrow::compute::Cast(value, target_type, *options)); + arrow::Datum out = + ValueOrStop(arrow::compute::Cast(value, target_type, *options, gc_context())); return out.chunked_array(); } @@ -179,9 +185,8 @@ std::shared_ptr make_compute_options( SEXP compute__CallFunction(std::string func_name, cpp11::list args, cpp11::list options) { auto opts = make_compute_options(func_name, options); auto datum_args = arrow::r::from_r_list(args); - arrow::compute::ExecContext context(gc_memory_pool()); auto out = ValueOrStop( - arrow::compute::CallFunction(func_name, datum_args, opts.get(), &context)); + arrow::compute::CallFunction(func_name, datum_args, opts.get(), gc_context())); return from_datum(out); } From ec4ac04b44d25af5ad29c63e5790413bdbfcbe3c Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Tue, 27 Oct 2020 16:27:17 -0400 Subject: [PATCH 5/6] more explicit memory_pool usage --- .../arrow/compute/kernels/vector_selection.cc | 6 ++++-- cpp/src/arrow/ipc/feather.h | 2 +- cpp/src/arrow/ipc/metadata_internal.cc | 19 ++++++++++++------- cpp/src/arrow/ipc/metadata_internal.h | 5 +++-- r/src/array_from_vector.cpp | 3 ++- r/src/arrow_types.h | 7 +++---- r/src/compression.cpp | 6 ++++-- r/src/io.cpp | 2 +- r/src/parquet.cpp | 3 ++- r/src/scalar.cpp | 2 +- 10 files changed, 33 insertions(+), 22 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/vector_selection.cc b/cpp/src/arrow/compute/kernels/vector_selection.cc index 21e454a5683..1062b53c8c3 100644 --- a/cpp/src/arrow/compute/kernels/vector_selection.cc +++ b/cpp/src/arrow/compute/kernels/vector_selection.cc @@ -1793,7 +1793,8 @@ void StructFilter(KernelContext* ctx, const ExecBatch& batch, Datum* out) { std::shared_ptr indices; KERNEL_RETURN_IF_ERROR( ctx, - GetTakeIndices(*batch[1].array(), FilterState::Get(ctx).null_selection_behavior) + GetTakeIndices(*batch[1].array(), FilterState::Get(ctx).null_selection_behavior, + ctx->memory_pool()) .Value(&indices)); Datum result; @@ -1820,7 +1821,8 @@ Result> FilterRecordBatch(const RecordBatch& batch, const auto& filter_opts = *static_cast(options); ARROW_ASSIGN_OR_RAISE( std::shared_ptr indices, - GetTakeIndices(*filter.array(), filter_opts.null_selection_behavior)); + GetTakeIndices(*filter.array(), filter_opts.null_selection_behavior, + ctx->memory_pool())); std::vector> columns(batch.num_columns()); for (int i = 0; i < batch.num_columns(); ++i) { ARROW_ASSIGN_OR_RAISE(Datum out, Take(batch.column(i)->data(), Datum(indices), diff --git a/cpp/src/arrow/ipc/feather.h b/cpp/src/arrow/ipc/feather.h index 187e8277072..b40893c408f 100644 --- a/cpp/src/arrow/ipc/feather.h +++ b/cpp/src/arrow/ipc/feather.h @@ -25,7 +25,7 @@ #include #include -#include "arrow/result.h" +#include "arrow/type_fwd.h" #include "arrow/util/compression.h" #include "arrow/util/visibility.h" diff --git a/cpp/src/arrow/ipc/metadata_internal.cc b/cpp/src/arrow/ipc/metadata_internal.cc index 10cee49963e..6c7fc51804e 100644 --- a/cpp/src/arrow/ipc/metadata_internal.cc +++ b/cpp/src/arrow/ipc/metadata_internal.cc @@ -869,12 +869,12 @@ Status SchemaToFlatbuffer(FBB& fbb, const Schema& schema, Result> WriteFBMessage( FBB& fbb, flatbuf::MessageHeader header_type, flatbuffers::Offset header, int64_t body_length, MetadataVersion version, - const std::shared_ptr& custom_metadata = nullptr) { + const std::shared_ptr& custom_metadata, MemoryPool* pool) { auto message = flatbuf::CreateMessage(fbb, MetadataVersionToFlatbuffer(version), header_type, header, body_length, SerializeCustomMetadata(fbb, custom_metadata)); fbb.Finish(message); - return WriteFlatbufferBuilder(fbb); + return WriteFlatbufferBuilder(fbb, pool); } using FieldNodeVector = @@ -1181,7 +1181,8 @@ Status WriteSchemaMessage(const Schema& schema, const DictionaryFieldMapper& map flatbuffers::Offset fb_schema; RETURN_NOT_OK(SchemaToFlatbuffer(fbb, schema, mapper, &fb_schema)); return WriteFBMessage(fbb, flatbuf::MessageHeader::Schema, fb_schema.Union(), - /*body_length=*/0, options.metadata_version) + /*body_length=*/0, options.metadata_version, + /*custom_metadata=*/nullptr, options.memory_pool) .Value(out); } @@ -1195,7 +1196,8 @@ Status WriteRecordBatchMessage( RETURN_NOT_OK( MakeRecordBatch(fbb, length, body_length, nodes, buffers, options, &record_batch)); return WriteFBMessage(fbb, flatbuf::MessageHeader::RecordBatch, record_batch.Union(), - body_length, options.metadata_version, custom_metadata) + body_length, options.metadata_version, custom_metadata, + options.memory_pool) .Value(out); } @@ -1229,7 +1231,8 @@ Result> WriteTensorMessage(const Tensor& tensor, flatbuf::CreateTensor(fbb, fb_type_type, fb_type, fb_shape, fb_strides, &buffer); return WriteFBMessage(fbb, flatbuf::MessageHeader::Tensor, fb_tensor.Union(), - body_length, options.metadata_version); + body_length, options.metadata_version, + /*custom_metadata=*/nullptr, options.memory_pool); } Result> WriteSparseTensorMessage( @@ -1240,7 +1243,8 @@ Result> WriteSparseTensorMessage( RETURN_NOT_OK( MakeSparseTensor(fbb, sparse_tensor, body_length, buffers, &fb_sparse_tensor)); return WriteFBMessage(fbb, flatbuf::MessageHeader::SparseTensor, - fb_sparse_tensor.Union(), body_length, options.metadata_version); + fb_sparse_tensor.Union(), body_length, options.metadata_version, + /*custom_metadata=*/nullptr, options.memory_pool); } Status WriteDictionaryMessage( @@ -1255,7 +1259,8 @@ Status WriteDictionaryMessage( auto dictionary_batch = flatbuf::CreateDictionaryBatch(fbb, id, record_batch, is_delta).Union(); return WriteFBMessage(fbb, flatbuf::MessageHeader::DictionaryBatch, dictionary_batch, - body_length, options.metadata_version, custom_metadata) + body_length, options.metadata_version, custom_metadata, + options.memory_pool) .Value(out); } diff --git a/cpp/src/arrow/ipc/metadata_internal.h b/cpp/src/arrow/ipc/metadata_internal.h index 7a69e7378af..d5c697fe57b 100644 --- a/cpp/src/arrow/ipc/metadata_internal.h +++ b/cpp/src/arrow/ipc/metadata_internal.h @@ -199,10 +199,11 @@ Status WriteDictionaryMessage( const IpcWriteOptions& options, std::shared_ptr* out); static inline Result> WriteFlatbufferBuilder( - flatbuffers::FlatBufferBuilder& fbb) { + flatbuffers::FlatBufferBuilder& fbb, // NOLINT non-const reference + MemoryPool* pool = default_memory_pool()) { int32_t size = fbb.GetSize(); - ARROW_ASSIGN_OR_RAISE(auto result, AllocateBuffer(size)); + ARROW_ASSIGN_OR_RAISE(auto result, AllocateBuffer(size, pool)); uint8_t* dst = result->mutable_data(); memcpy(dst, fbb.GetBufferPointer(), size); diff --git a/r/src/array_from_vector.cpp b/r/src/array_from_vector.cpp index 1066f1d0876..c2d22868535 100644 --- a/r/src/array_from_vector.cpp +++ b/r/src/array_from_vector.cpp @@ -357,7 +357,8 @@ std::shared_ptr MakeFactorArrayImpl(cpp11::integers factor, if (i < n) { // there are NA's so we need a null buffer - auto null_buffer = ValueOrStop(AllocateBuffer(BitUtil::BytesForBits(n))); + auto null_buffer = + ValueOrStop(AllocateBuffer(BitUtil::BytesForBits(n), gc_memory_pool())); internal::FirstTimeBitmapWriter null_bitmap_writer(null_buffer->mutable_data(), 0, n); // catch up diff --git a/r/src/arrow_types.h b/r/src/arrow_types.h index a25f0a639d0..4a1845a16bc 100644 --- a/r/src/arrow_types.h +++ b/r/src/arrow_types.h @@ -71,10 +71,9 @@ template class RBuffer : public MutableBuffer { public: explicit RBuffer(RVector vec) - : MutableBuffer( - reinterpret_cast(DATAPTR(vec)), - vec.size() * sizeof(typename RVector::value_type), - arrow::CPUMemoryManager::Make(CPUDevice::Instance(), gc_memory_pool())), + : MutableBuffer(reinterpret_cast(DATAPTR(vec)), + vec.size() * sizeof(typename RVector::value_type), + arrow::CPUDevice::memory_manager(gc_memory_pool())), vec_(vec) {} private: diff --git a/r/src/compression.cpp b/r/src/compression.cpp index c406649a74b..18c63e4fd19 100644 --- a/r/src/compression.cpp +++ b/r/src/compression.cpp @@ -41,14 +41,16 @@ bool util___Codec__IsAvailable(arrow::Compression::type codec) { std::shared_ptr io___CompressedOutputStream__Make( const std::shared_ptr& codec, const std::shared_ptr& raw) { - return ValueOrStop(arrow::io::CompressedOutputStream::Make(codec.get(), raw)); + return ValueOrStop( + arrow::io::CompressedOutputStream::Make(codec.get(), raw, gc_memory_pool())); } // [[arrow::export]] std::shared_ptr io___CompressedInputStream__Make( const std::shared_ptr& codec, const std::shared_ptr& raw) { - return ValueOrStop(arrow::io::CompressedInputStream::Make(codec.get(), raw)); + return ValueOrStop( + arrow::io::CompressedInputStream::Make(codec.get(), raw, gc_memory_pool())); } #endif diff --git a/r/src/io.cpp b/r/src/io.cpp index 90e7e87b214..6a912dd7815 100644 --- a/r/src/io.cpp +++ b/r/src/io.cpp @@ -111,7 +111,7 @@ void io___MemoryMappedFile__Resize(const std::shared_ptr io___ReadableFile__Open( const std::string& path) { - return ValueOrStop(arrow::io::ReadableFile::Open(path)); + return ValueOrStop(arrow::io::ReadableFile::Open(path, gc_memory_pool())); } // ------ arrow::io::BufferReader diff --git a/r/src/parquet.cpp b/r/src/parquet.cpp index 3f6b1113b5d..6f8db31410f 100644 --- a/r/src/parquet.cpp +++ b/r/src/parquet.cpp @@ -62,7 +62,8 @@ std::shared_ptr parquet___arrow___FileReader__OpenFi std::unique_ptr reader; parquet::arrow::FileReaderBuilder builder; PARQUET_THROW_NOT_OK(builder.Open(file)); - PARQUET_THROW_NOT_OK(builder.properties(*props)->Build(&reader)); + PARQUET_THROW_NOT_OK( + builder.memory_pool(gc_memory_pool())->properties(*props)->Build(&reader)); return std::move(reader); } diff --git a/r/src/scalar.cpp b/r/src/scalar.cpp index 013ff1e6026..d9a3b569c36 100644 --- a/r/src/scalar.cpp +++ b/r/src/scalar.cpp @@ -54,7 +54,7 @@ std::shared_ptr StructScalar__GetFieldByName( // [[arrow::export]] SEXP Scalar__as_vector(const std::shared_ptr& scalar) { - auto array = ValueOrStop(arrow::MakeArrayFromScalar(*scalar, 1)); + auto array = ValueOrStop(arrow::MakeArrayFromScalar(*scalar, 1, gc_memory_pool())); // defined in array_to_vector.cpp SEXP Array__as_vector(const std::shared_ptr& array); From 67f7d14abd9db0cc8466b03828ad1a1865704d5e Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Thu, 29 Oct 2020 17:45:21 -0400 Subject: [PATCH 6/6] Update r/tests/testthat/test-arrow.R Co-authored-by: Neal Richardson --- r/tests/testthat/test-arrow.R | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/r/tests/testthat/test-arrow.R b/r/tests/testthat/test-arrow.R index 91720ed9b7b..2c901e1c96a 100644 --- a/r/tests/testthat/test-arrow.R +++ b/r/tests/testthat/test-arrow.R @@ -66,11 +66,9 @@ test_that("MemoryPool calls gc() to free memory when allocation fails (ARROW-100 trace(gc, print = FALSE, tracer = function() { env$gc_was_called <- TRUE }) - expect_error( - BufferOutputStream$create(2 ** 60), - "Out of memory" - ) + on.exit(untrace(gc)) + # We expect this should fail because we don't have this much memory, + # but it should gc() and retry (and fail again) + expect_error(BufferOutputStream$create(2 ** 60)) expect_true(env$gc_was_called) - - untrace(gc) })