From 91aaf46f1fce3c042430f74ac002255b97bfdf72 Mon Sep 17 00:00:00 2001 From: Jeroen van Straten Date: Mon, 10 Jan 2022 11:55:34 +0100 Subject: [PATCH 01/25] Remove dead code in memory pool --- cpp/src/arrow/memory_pool.cc | 18 +++--------------- 1 file changed, 3 insertions(+), 15 deletions(-) diff --git a/cpp/src/arrow/memory_pool.cc b/cpp/src/arrow/memory_pool.cc index 2fab6f38255..6832b0382ef 100644 --- a/cpp/src/arrow/memory_pool.cc +++ b/cpp/src/arrow/memory_pool.cc @@ -949,18 +949,7 @@ class PoolBuffer final : public ResizableBuffer { return Status::OK(); } - static std::shared_ptr MakeShared(MemoryPool* pool) { - std::shared_ptr mm; - if (pool == nullptr) { - pool = default_memory_pool(); - mm = default_cpu_memory_manager(); - } else { - mm = CPUDevice::memory_manager(pool); - } - return std::make_shared(std::move(mm), pool); - } - - static std::unique_ptr MakeUnique(MemoryPool* pool) { + static std::unique_ptr Make(MemoryPool* pool) { std::shared_ptr mm; if (pool == nullptr) { pool = default_memory_pool(); @@ -989,13 +978,12 @@ inline Result ResizePoolBuffer(PoolBufferPtr&& buffer, const int64_t } // namespace Result> AllocateBuffer(const int64_t size, MemoryPool* pool) { - return ResizePoolBuffer>(PoolBuffer::MakeUnique(pool), size); + return ResizePoolBuffer>(PoolBuffer::Make(pool), size); } Result> AllocateResizableBuffer(const int64_t size, MemoryPool* pool) { - return ResizePoolBuffer>(PoolBuffer::MakeUnique(pool), - size); + return ResizePoolBuffer>(PoolBuffer::Make(pool), size); } } // namespace arrow From 297f0d70ce8c93cbe03d4f2e6e2441779b55cdcc Mon Sep 17 00:00:00 2001 From: Jeroen van Straten Date: Mon, 10 Jan 2022 20:04:32 +0100 Subject: [PATCH 02/25] Add MakeBufferOfZeros() and supporting MemoryPool abstractions --- cpp/src/arrow/array/util.cc | 3 +- cpp/src/arrow/buffer.h | 7 ++ cpp/src/arrow/memory_pool.cc | 154 ++++++++++++++++++++++++++++++++++- cpp/src/arrow/memory_pool.h | 56 +++++++++++++ 4 files changed, 217 insertions(+), 3 deletions(-) diff --git a/cpp/src/arrow/array/util.cc b/cpp/src/arrow/array/util.cc index 413182de0df..a4b3b01efd6 100644 --- a/cpp/src/arrow/array/util.cc +++ b/cpp/src/arrow/array/util.cc @@ -422,8 +422,7 @@ class NullArrayFactory { Status CreateBuffer() { ARROW_ASSIGN_OR_RAISE(int64_t buffer_length, GetBufferLength(type_, length_).Finish()); - ARROW_ASSIGN_OR_RAISE(buffer_, AllocateBuffer(buffer_length, pool_)); - std::memset(buffer_->mutable_data(), 0, buffer_->size()); + ARROW_ASSIGN_OR_RAISE(buffer_, MakeBufferOfZeros(buffer_length, pool_)); return Status::OK(); } diff --git a/cpp/src/arrow/buffer.h b/cpp/src/arrow/buffer.h index cfd525ab2d6..37624c6977e 100644 --- a/cpp/src/arrow/buffer.h +++ b/cpp/src/arrow/buffer.h @@ -462,6 +462,13 @@ ARROW_EXPORT Result> AllocateResizableBuffer( const int64_t size, MemoryPool* pool = NULLPTR); +/// \brief Allocate or reuse an immutable buffer filled with zeros from a memory pool. +/// +/// \param[in] size size of buffer to allocate +/// \param[in] pool a memory pool +ARROW_EXPORT +Result> MakeBufferOfZeros(int64_t size, MemoryPool* pool); + /// \brief Allocate a bitmap buffer from a memory pool /// no guarantee on values is provided. /// diff --git a/cpp/src/arrow/memory_pool.cc b/cpp/src/arrow/memory_pool.cc index 6832b0382ef..dc973d0af7d 100644 --- a/cpp/src/arrow/memory_pool.cc +++ b/cpp/src/arrow/memory_pool.cc @@ -30,6 +30,11 @@ #include #endif +#ifdef __linux__ +#define ARROW_MMAP_FOR_IMMUTABLE_ZEROS +#include +#endif + #include "arrow/buffer.h" #include "arrow/io/util_internal.h" #include "arrow/result.h" @@ -603,7 +608,99 @@ class BaseMemoryPoolImpl : public MemoryPool { stats_.UpdateAllocatedBytes(-size); } - void ReleaseUnused() override { Allocator::ReleaseUnused(); } + protected: + virtual Status AllocateImmutableZeros(int64_t size, uint8_t** out) { +#ifdef ARROW_MMAP_FOR_IMMUTABLE_ZEROS + if (size > 0) { + *out = static_cast(mmap( + NULLPTR, size, PROT_READ, MAP_PRIVATE | MAP_ANONYMOUS | MAP_NORESERVE, -1, 0)); + if (*out == MAP_FAILED) { + auto err = errno; + return Status::OutOfMemory("Failed to allocate zero buffer of size ", size, ": ", + strerror(err)); + } + return Status::OK(); + } +#endif + RETURN_NOT_OK(Allocate(size, out)); + std::memset(out, 0, size); + return Status::OK(); + } + + void FreeImmutableZeros(uint8_t* buffer, int64_t size) override { +#ifdef ARROW_MMAP_FOR_IMMUTABLE_ZEROS + if (size > 0) { + munmap(buffer, size); + return; + } +#endif + Free(buffer, size); + } + + public: + Result> GetImmutableZeros(int64_t size) override { + // Thread-safely get the current largest buffer of zeros. + std::shared_ptr current_buffer; + { + std::lock_guard ga(immutable_zeros_access_mutex_); + current_buffer = immutable_zeros_cache_; + } + + // If no buffer exists yet or the current buffer isn't large enough, + // acquire the mutex for growing the buffer, then try again (another thread + // may have beat us to it and have grown the buffer already while we're + // waiting for the lock). + if (!current_buffer || current_buffer->size() < size) { + std::lock_guard gg(immutable_zeros_grow_mutex_); + { + std::lock_guard ga(immutable_zeros_access_mutex_); + current_buffer = immutable_zeros_cache_; + } + + // If the buffer is still not large enough, heuristically allocate a + // larger buffer. Try to allocate at least twice the size of the current + // buffer first, to prevent allocating lots of buffers for subsequent + // calls with slightly larger sizes. Fall back to the requested size if + // this fails. + if (!current_buffer || current_buffer->size() < size) { + uint8_t* data = NULLPTR; + int64_t alloc_size; + if (current_buffer && size < current_buffer->size() * 2) { + alloc_size = current_buffer->size() * 2; + if (!AllocateImmutableZeros(alloc_size, &data).ok()) { + data = NULLPTR; + } + } + if (data == NULLPTR) { + alloc_size = size; + RETURN_NOT_OK(AllocateImmutableZeros(alloc_size, &data)); + } + current_buffer = std::make_shared(data, alloc_size, this); + + // Save the new buffer for other threads to use. + { + std::lock_guard ga(immutable_zeros_access_mutex_); + immutable_zeros_cache_ = current_buffer; + } + } + } + + return std::move(current_buffer); + } + + void ReleaseUnused() override { + // Get rid of the ImmutableZeros cache if we're the only one using it. If + // there are other pieces of code using it, getting rid of the cache won't + // deallocate it anyway, so it's better to hold onto it. + { + std::lock_guard ga(immutable_zeros_access_mutex_); + if (immutable_zeros_cache_.use_count() <= 1) { + immutable_zeros_cache_.reset(); + } + } + + Allocator::ReleaseUnused(); + } int64_t bytes_allocated() const override { return stats_.bytes_allocated(); } @@ -611,6 +708,9 @@ class BaseMemoryPoolImpl : public MemoryPool { protected: internal::MemoryPoolStats stats_; + std::shared_ptr immutable_zeros_cache_; + std::mutex immutable_zeros_access_mutex_; + std::mutex immutable_zeros_grow_mutex_; }; class SystemMemoryPool : public BaseMemoryPoolImpl { @@ -719,6 +819,28 @@ static struct GlobalState { #endif } global_state; +MemoryPool::ImmutableZeros::~ImmutableZeros() { + // Avoid calling pool_->FreeImmutableZeros if the global pools are destroyed + // (XXX this will not work with user-defined pools) + + // This can happen if a Future is destructing on one thread while or + // after memory pools are destructed on the main thread (as there is + // no guarantee of destructor order between thread/memory pools) + if (data_ && !global_state.is_finalizing()) { + pool_->FreeImmutableZeros(data_, size_); + } +} + +Result> MemoryPool::GetImmutableZeros( + int64_t size) { + uint8_t* data; + RETURN_NOT_OK(Allocate(size, &data)); + std::memset(data, 0, size); + return std::make_shared(data, size, this); +} + +void MemoryPool::FreeImmutableZeros(uint8_t* buffer, int64_t size) { Free(buffer, size); } + MemoryPool* system_memory_pool() { return global_state.system_memory_pool(); } Status jemalloc_memory_pool(MemoryPool** out) { @@ -964,6 +1086,32 @@ class PoolBuffer final : public ResizableBuffer { MemoryPool* pool_; }; +/// An immutable Buffer containing zeros, whose lifetime is tied to a particular +/// MemoryPool +class ImmutableZerosPoolBuffer final : public Buffer { + public: + explicit ImmutableZerosPoolBuffer(std::shared_ptr&& zeros, + int64_t size, std::shared_ptr&& mm) + : Buffer(zeros->data(), size, std::move(mm)), zeros_(std::move(zeros)) {} + + static Result> Make(int64_t size, + MemoryPool* pool) { + std::shared_ptr mm; + if (pool == nullptr) { + pool = default_memory_pool(); + mm = default_cpu_memory_manager(); + } else { + mm = CPUDevice::memory_manager(pool); + } + ARROW_ASSIGN_OR_RAISE(auto zeros, pool->GetImmutableZeros(size)); + return std::unique_ptr( + new ImmutableZerosPoolBuffer(std::move(zeros), size, std::move(mm))); + } + + private: + std::shared_ptr zeros_; +}; + namespace { // A utility that does most of the work of the `AllocateBuffer` and // `AllocateResizableBuffer` methods. The argument `buffer` should be a smart pointer to @@ -986,4 +1134,8 @@ Result> AllocateResizableBuffer(const int64_t s return ResizePoolBuffer>(PoolBuffer::Make(pool), size); } +Result> MakeBufferOfZeros(const int64_t size, MemoryPool* pool) { + return ImmutableZerosPoolBuffer::Make(size, pool); +} + } // namespace arrow diff --git a/cpp/src/arrow/memory_pool.h b/cpp/src/arrow/memory_pool.h index 58b375af3a9..0815d63843b 100644 --- a/cpp/src/arrow/memory_pool.h +++ b/cpp/src/arrow/memory_pool.h @@ -63,6 +63,42 @@ class MemoryPoolStats { /// take care of the required 64-byte alignment. class ARROW_EXPORT MemoryPool { public: + class ARROW_EXPORT ImmutableZeros { + public: + explicit ImmutableZeros(uint8_t* data, int64_t size, MemoryPool* pool) + : pool_(pool), data_(data), size_(size) {} + + ImmutableZeros() = default; + + ~ImmutableZeros(); + + // Prevent copies and handle moves explicitly to avoid double free + ImmutableZeros(const ImmutableZeros&) = delete; + ImmutableZeros& operator=(const ImmutableZeros&) = delete; + + ImmutableZeros(ImmutableZeros&& other) noexcept + : pool_(other.pool_), data_(other.data_), size_(other.size_) { + other.pool_ = NULLPTR; + other.data_ = NULLPTR; + other.size_ = 0; + } + + ImmutableZeros& operator=(ImmutableZeros&& other) noexcept { + std::swap(data_, other.data_); + std::swap(pool_, other.pool_); + std::swap(size_, other.size_); + return *this; + } + + const uint8_t* data() const { return data_; } + int64_t size() const { return size_; } + + private: + MemoryPool* pool_ = NULLPTR; + uint8_t* data_ = NULLPTR; + int64_t size_ = 0; + }; + virtual ~MemoryPool() = default; /// \brief EXPERIMENTAL. Create a new instance of the default MemoryPool @@ -87,6 +123,18 @@ class ARROW_EXPORT MemoryPool { /// faster deallocation if supported by its backend. virtual void Free(uint8_t* buffer, int64_t size) = 0; + /// Return a block of immutable zero bytes of at least the given size. + /// + /// These blocks are useful when a buffer, array, or some other readable block + /// of data is needed to comply with an interface, but the contents of the + /// block don't matter and/or can just be zero. Unlike other allocations, the + /// same underlying block of memory can be shared between all users (for a + /// particular size limit), thus reducing memory footprint. + /// + /// The allocated region shall be 64-byte aligned. A region will be + /// deallocated when all shared_ptrs to the region are destroyed. + virtual Result> GetImmutableZeros(int64_t size); + /// Return unused memory to the OS /// /// Only applies to allocators that hold onto unused memory. This will be @@ -109,6 +157,14 @@ class ARROW_EXPORT MemoryPool { protected: MemoryPool() = default; + + /// Free a memory region allocated by GetImmutableZeros(). + /// + /// @param buffer Pointer to the start of the allocated memory region + /// @param size Allocated size located at buffer. An allocator implementation + /// may use this for tracking the amount of allocated bytes as well as for + /// faster deallocation if supported by its backend. + virtual void FreeImmutableZeros(uint8_t* buffer, int64_t size); }; class ARROW_EXPORT LoggingMemoryPool : public MemoryPool { From 389a188d44eb3d09f3d05c0de3ec7c2e143606a0 Mon Sep 17 00:00:00 2001 From: Jeroen van Straten Date: Tue, 18 Jan 2022 14:37:04 +0100 Subject: [PATCH 03/25] Separate MakeArrayOfNull and MakeArrayFromScalar into mutable and immutable variants --- cpp/src/arrow/array/array_test.cc | 223 ++++++++++++++++-------------- cpp/src/arrow/array/util.cc | 169 +++++++++++++++++++++- cpp/src/arrow/array/util.h | 34 ++++- 3 files changed, 315 insertions(+), 111 deletions(-) diff --git a/cpp/src/arrow/array/array_test.cc b/cpp/src/arrow/array/array_test.cc index 4e13c54d1c8..437ac6f8896 100644 --- a/cpp/src/arrow/array/array_test.cc +++ b/cpp/src/arrow/array/array_test.cc @@ -368,25 +368,27 @@ TEST_F(TestArray, TestMakeArrayOfNull) { // clang-format on }; - for (int64_t length : {0, 1, 16, 133}) { - for (auto type : types) { - ARROW_SCOPED_TRACE("type = ", type->ToString()); - ASSERT_OK_AND_ASSIGN(auto array, MakeArrayOfNull(type, length)); - ASSERT_EQ(array->type(), type); - ASSERT_OK(array->ValidateFull()); - ASSERT_EQ(array->length(), length); - if (is_union(type->id())) { - // For unions, MakeArrayOfNull places the nulls in the children - ASSERT_EQ(array->null_count(), 0); - const auto& union_array = checked_cast(*array); - for (int i = 0; i < union_array.num_fields(); ++i) { - ASSERT_EQ(union_array.field(i)->null_count(), union_array.field(i)->length()); - } - } else { - ASSERT_EQ(array->null_count(), length); - for (int64_t i = 0; i < length; ++i) { - ASSERT_TRUE(array->IsNull(i)); - ASSERT_FALSE(array->IsValid(i)); + for (auto fun : {MakeArrayOfNull, MakeImmutableArrayOfNull}) { + for (int64_t length : {0, 1, 16, 133}) { + for (auto type : types) { + ARROW_SCOPED_TRACE("type = ", type->ToString()); + ASSERT_OK_AND_ASSIGN(auto array, fun(type, length, default_memory_pool())); + ASSERT_EQ(array->type(), type); + ASSERT_OK(array->ValidateFull()); + ASSERT_EQ(array->length(), length); + if (is_union(type->id())) { + // For unions, MakeArrayOfNull places the nulls in the children + ASSERT_EQ(array->null_count(), 0); + const auto& union_array = checked_cast(*array); + for (int i = 0; i < union_array.num_fields(); ++i) { + ASSERT_EQ(union_array.field(i)->null_count(), union_array.field(i)->length()); + } + } else { + ASSERT_EQ(array->null_count(), length); + for (int64_t i = 0; i < length; ++i) { + ASSERT_TRUE(array->IsNull(i)); + ASSERT_FALSE(array->IsValid(i)); + } } } } @@ -397,51 +399,56 @@ TEST_F(TestArray, TestMakeArrayOfNullUnion) { // Unions need special checking -- the top level null count is 0 (per // ARROW-9222) so we check the first child to make sure is contains all nulls // and check that the type_ids all point to the first child - const int64_t union_length = 10; - auto s_union_ty = sparse_union({field("a", utf8()), field("b", int32())}, {0, 1}); - ASSERT_OK_AND_ASSIGN(auto s_union_nulls, MakeArrayOfNull(s_union_ty, union_length)); - ASSERT_OK(s_union_nulls->ValidateFull()); - ASSERT_EQ(s_union_nulls->null_count(), 0); - { - const auto& typed_union = checked_cast(*s_union_nulls); - ASSERT_EQ(typed_union.field(0)->null_count(), union_length); - - // Check type codes are all 0 - for (int i = 0; i < union_length; ++i) { - ASSERT_EQ(typed_union.raw_type_codes()[i], 0); + for (auto fun : {MakeArrayOfNull, MakeImmutableArrayOfNull}) { + const int64_t union_length = 10; + auto s_union_ty = sparse_union({field("a", utf8()), field("b", int32())}, {0, 1}); + ASSERT_OK_AND_ASSIGN(auto s_union_nulls, + fun(s_union_ty, union_length, default_memory_pool())); + ASSERT_OK(s_union_nulls->ValidateFull()); + ASSERT_EQ(s_union_nulls->null_count(), 0); + { + const auto& typed_union = checked_cast(*s_union_nulls); + ASSERT_EQ(typed_union.field(0)->null_count(), union_length); + + // Check type codes are all 0 + for (int i = 0; i < union_length; ++i) { + ASSERT_EQ(typed_union.raw_type_codes()[i], 0); + } } - } - - s_union_ty = sparse_union({field("a", utf8()), field("b", int32())}, {2, 7}); - ASSERT_OK_AND_ASSIGN(s_union_nulls, MakeArrayOfNull(s_union_ty, union_length)); - ASSERT_OK(s_union_nulls->ValidateFull()); - ASSERT_EQ(s_union_nulls->null_count(), 0); - { - const auto& typed_union = checked_cast(*s_union_nulls); - ASSERT_EQ(typed_union.field(0)->null_count(), union_length); - // Check type codes are all 2 - for (int i = 0; i < union_length; ++i) { - ASSERT_EQ(typed_union.raw_type_codes()[i], 2); + s_union_ty = sparse_union({field("a", utf8()), field("b", int32())}, {2, 7}); + ASSERT_OK_AND_ASSIGN(s_union_nulls, + fun(s_union_ty, union_length, default_memory_pool())); + ASSERT_OK(s_union_nulls->ValidateFull()); + ASSERT_EQ(s_union_nulls->null_count(), 0); + { + const auto& typed_union = checked_cast(*s_union_nulls); + ASSERT_EQ(typed_union.field(0)->null_count(), union_length); + + // Check type codes are all 2 + for (int i = 0; i < union_length; ++i) { + ASSERT_EQ(typed_union.raw_type_codes()[i], 2); + } } - } - - auto d_union_ty = dense_union({field("a", utf8()), field("b", int32())}, {0, 1}); - ASSERT_OK_AND_ASSIGN(auto d_union_nulls, MakeArrayOfNull(d_union_ty, union_length)); - ASSERT_OK(d_union_nulls->ValidateFull()); - ASSERT_EQ(d_union_nulls->null_count(), 0); - { - const auto& typed_union = checked_cast(*d_union_nulls); - // Child field has length 1 which is a null element - ASSERT_EQ(typed_union.field(0)->length(), 1); - ASSERT_EQ(typed_union.field(0)->null_count(), 1); - - // Check type codes are all 0 and the offsets point to the first element of - // the first child - for (int i = 0; i < union_length; ++i) { - ASSERT_EQ(typed_union.raw_type_codes()[i], 0); - ASSERT_EQ(typed_union.raw_value_offsets()[i], 0); + auto d_union_ty = dense_union({field("a", utf8()), field("b", int32())}, {0, 1}); + ASSERT_OK_AND_ASSIGN(auto d_union_nulls, + fun(d_union_ty, union_length, default_memory_pool())); + ASSERT_OK(d_union_nulls->ValidateFull()); + ASSERT_EQ(d_union_nulls->null_count(), 0); + { + const auto& typed_union = checked_cast(*d_union_nulls); + + // Child field has length 1 which is a null element + ASSERT_EQ(typed_union.field(0)->length(), 1); + ASSERT_EQ(typed_union.field(0)->null_count(), 1); + + // Check type codes are all 0 and the offsets point to the first element of + // the first child + for (int i = 0; i < union_length; ++i) { + ASSERT_EQ(typed_union.raw_type_codes()[i], 0); + ASSERT_EQ(typed_union.raw_value_offsets()[i], 0); + } } } } @@ -579,30 +586,32 @@ static ScalarVector GetScalars() { } TEST_F(TestArray, TestMakeArrayFromScalar) { - ASSERT_OK_AND_ASSIGN(auto null_array, MakeArrayFromScalar(NullScalar(), 5)); - ASSERT_OK(null_array->ValidateFull()); - ASSERT_EQ(null_array->length(), 5); - ASSERT_EQ(null_array->null_count(), 5); - - auto scalars = GetScalars(); + for (auto fun : {MakeArrayFromScalar, MakeImmutableArrayFromScalar}) { + ASSERT_OK_AND_ASSIGN(auto null_array, fun(NullScalar(), 5, default_memory_pool())); + ASSERT_OK(null_array->ValidateFull()); + ASSERT_EQ(null_array->length(), 5); + ASSERT_EQ(null_array->null_count(), 5); + + auto scalars = GetScalars(); + + for (int64_t length : {16}) { + for (auto scalar : scalars) { + ASSERT_OK_AND_ASSIGN(auto array, fun(*scalar, length, default_memory_pool())); + ASSERT_OK(array->ValidateFull()); + ASSERT_EQ(array->length(), length); + ASSERT_EQ(array->null_count(), 0); - for (int64_t length : {16}) { - for (auto scalar : scalars) { - ASSERT_OK_AND_ASSIGN(auto array, MakeArrayFromScalar(*scalar, length)); - ASSERT_OK(array->ValidateFull()); - ASSERT_EQ(array->length(), length); - ASSERT_EQ(array->null_count(), 0); - - // test case for ARROW-13321 - for (int64_t i : std::vector{0, length / 2, length - 1}) { - ASSERT_OK_AND_ASSIGN(auto s, array->GetScalar(i)); - AssertScalarsEqual(*s, *scalar, /*verbose=*/true); + // test case for ARROW-13321 + for (int64_t i : std::vector{0, length / 2, length - 1}) { + ASSERT_OK_AND_ASSIGN(auto s, array->GetScalar(i)); + AssertScalarsEqual(*s, *scalar, /*verbose=*/true); + } } } - } - for (auto scalar : scalars) { - AssertAppendScalar(pool_, scalar); + for (auto scalar : scalars) { + AssertAppendScalar(pool_, scalar); + } } } @@ -610,13 +619,15 @@ TEST_F(TestArray, TestMakeArrayFromScalarSliced) { // Regression test for ARROW-13437 auto scalars = GetScalars(); - for (auto scalar : scalars) { - SCOPED_TRACE(scalar->type->ToString()); - ASSERT_OK_AND_ASSIGN(auto array, MakeArrayFromScalar(*scalar, 32)); - auto sliced = array->Slice(1, 4); - ASSERT_EQ(sliced->length(), 4); - ASSERT_EQ(sliced->null_count(), 0); - ARROW_EXPECT_OK(sliced->ValidateFull()); + for (auto fun : {MakeArrayFromScalar, MakeImmutableArrayFromScalar}) { + for (auto scalar : scalars) { + SCOPED_TRACE(scalar->type->ToString()); + ASSERT_OK_AND_ASSIGN(auto array, fun(*scalar, 32, default_memory_pool())); + auto sliced = array->Slice(1, 4); + ASSERT_EQ(sliced->length(), 4); + ASSERT_EQ(sliced->null_count(), 0); + ARROW_EXPECT_OK(sliced->ValidateFull()); + } } } @@ -626,14 +637,16 @@ TEST_F(TestArray, TestMakeArrayFromDictionaryScalar) { ASSERT_OK_AND_ASSIGN(auto value, MakeScalar(int8(), 1)); auto scalar = DictionaryScalar({value, dictionary}, type); - ASSERT_OK_AND_ASSIGN(auto array, MakeArrayFromScalar(scalar, 4)); - ASSERT_OK(array->ValidateFull()); - ASSERT_EQ(array->length(), 4); - ASSERT_EQ(array->null_count(), 0); + for (auto fun : {MakeArrayFromScalar, MakeImmutableArrayFromScalar}) { + ASSERT_OK_AND_ASSIGN(auto array, fun(scalar, 4, default_memory_pool())); + ASSERT_OK(array->ValidateFull()); + ASSERT_EQ(array->length(), 4); + ASSERT_EQ(array->null_count(), 0); - for (int i = 0; i < 4; i++) { - ASSERT_OK_AND_ASSIGN(auto item, array->GetScalar(i)); - ASSERT_TRUE(item->Equals(scalar)); + for (int i = 0; i < 4; i++) { + ASSERT_OK_AND_ASSIGN(auto item, array->GetScalar(i)); + ASSERT_TRUE(item->Equals(scalar)); + } } } @@ -643,17 +656,21 @@ TEST_F(TestArray, TestMakeArrayFromMapScalar) { R"([{"key": "a", "value": 1}, {"key": "b", "value": 2}])"); auto scalar = MapScalar(value); - ASSERT_OK_AND_ASSIGN(auto array, MakeArrayFromScalar(scalar, 11)); - ASSERT_OK(array->ValidateFull()); - ASSERT_EQ(array->length(), 11); - ASSERT_EQ(array->null_count(), 0); + for (auto fun : {MakeArrayFromScalar, MakeImmutableArrayFromScalar}) { + ASSERT_OK_AND_ASSIGN(auto array, fun(scalar, 11, default_memory_pool())); + ASSERT_OK(array->ValidateFull()); + ASSERT_EQ(array->length(), 11); + ASSERT_EQ(array->null_count(), 0); - for (int i = 0; i < 11; i++) { - ASSERT_OK_AND_ASSIGN(auto item, array->GetScalar(i)); - ASSERT_TRUE(item->Equals(scalar)); - } + for (int i = 0; i < 11; i++) { + ASSERT_OK_AND_ASSIGN(auto item, array->GetScalar(i)); + ASSERT_TRUE(item->Equals(scalar)); + } - AssertAppendScalar(pool_, std::make_shared(scalar)); + if (fun != MakeImmutableArrayFromScalar) { + AssertAppendScalar(pool_, std::make_shared(scalar)); + } + } } TEST_F(TestArray, TestMakeEmptyArray) { diff --git a/cpp/src/arrow/array/util.cc b/cpp/src/arrow/array/util.cc index a4b3b01efd6..36b9cee02ef 100644 --- a/cpp/src/arrow/array/util.cc +++ b/cpp/src/arrow/array/util.cc @@ -319,7 +319,7 @@ namespace { // get the maximum buffer length required, then allocate a single zeroed buffer // to use anywhere a buffer is required -class NullArrayFactory { +class ImmutableNullArrayFactory { public: struct GetBufferLength { GetBufferLength(const std::shared_ptr& type, int64_t length) @@ -415,8 +415,8 @@ class NullArrayFactory { int64_t length_, buffer_length_; }; - NullArrayFactory(MemoryPool* pool, const std::shared_ptr& type, - int64_t length) + ImmutableNullArrayFactory(MemoryPool* pool, const std::shared_ptr& type, + int64_t length) : pool_(pool), type_(type), length_(length) {} Status CreateBuffer() { @@ -521,7 +521,7 @@ class NullArrayFactory { Result> CreateChild(const DataType& type, int i, int64_t length) { - NullArrayFactory child_factory(pool_, type.field(i)->type(), length); + ImmutableNullArrayFactory child_factory(pool_, type.field(i)->type(), length); child_factory.buffer_ = buffer_; return child_factory.Create(); } @@ -533,6 +533,150 @@ class NullArrayFactory { std::shared_ptr buffer_; }; +// mutable version of ImmutableNullArrayFactory +class NullArrayFactory { + public: + NullArrayFactory(MemoryPool* pool, const std::shared_ptr& type, + int64_t length) + : pool_(pool), type_(type), length_(length) {} + + Result> Create() { + std::vector> child_data(type_->num_fields()); + ARROW_ASSIGN_OR_RAISE(auto validity, CreateZeroBitBuffer(1)); + out_ = ArrayData::Make(type_, length_, {validity}, child_data, length_, 0); + RETURN_NOT_OK(VisitTypeInline(*type_, this)); + return out_; + } + + Result> CreateZeroByteBuffer(size_t scalar_size_bytes) { + ARROW_ASSIGN_OR_RAISE(auto buffer, + AllocateBuffer(length_ * scalar_size_bytes, pool_)); + std::memset(buffer->mutable_data(), 0, buffer->size()); + return buffer; + } + + Result> CreateZeroOffsetBuffer(size_t index_size_bytes) { + ARROW_ASSIGN_OR_RAISE(auto buffer, + AllocateBuffer((length_ + 1) * index_size_bytes, pool_)); + std::memset(buffer->mutable_data(), 0, buffer->size()); + return buffer; + } + + Result> CreateZeroBitBuffer(size_t scalar_size_bits) { + ARROW_ASSIGN_OR_RAISE(auto buffer, + AllocateBuffer((length_ * scalar_size_bits + 7) / 8, pool_)); + std::memset(buffer->mutable_data(), 0, buffer->size()); + return buffer; + } + + Result> CreateEmptyBuffer() { return AllocateBuffer(0); } + + Status Visit(const NullType&) { + out_->buffers.resize(1, nullptr); + return Status::OK(); + } + + Status Visit(const FixedWidthType& type) { + out_->buffers.resize(2); + // values + ARROW_ASSIGN_OR_RAISE(out_->buffers[1], CreateZeroBitBuffer(type.bit_width())); + return Status::OK(); + } + + template + enable_if_base_binary Visit(const T&) { + out_->buffers.resize(3); + // offsets + ARROW_ASSIGN_OR_RAISE(out_->buffers[1], + CreateZeroOffsetBuffer(sizeof(typename T::offset_type))); + // values + ARROW_ASSIGN_OR_RAISE(out_->buffers[2], CreateEmptyBuffer()); + return Status::OK(); + } + + template + enable_if_var_size_list Visit(const T& type) { + out_->buffers.resize(2); + // offsets + ARROW_ASSIGN_OR_RAISE(out_->buffers[1], + CreateZeroOffsetBuffer(sizeof(typename T::offset_type))); + // values + ARROW_ASSIGN_OR_RAISE(out_->child_data[0], CreateChild(type, 0, /*length=*/0)); + return Status::OK(); + } + + Status Visit(const FixedSizeListType& type) { + ARROW_ASSIGN_OR_RAISE(out_->child_data[0], + CreateChild(type, 0, length_ * type.list_size())); + return Status::OK(); + } + + Status Visit(const StructType& type) { + for (int i = 0; i < type_->num_fields(); ++i) { + ARROW_ASSIGN_OR_RAISE(out_->child_data[i], CreateChild(type, i, length_)); + } + return Status::OK(); + } + + Status Visit(const UnionType& type) { + out_->buffers.resize(2); + + // First buffer is always null + out_->buffers[0] = nullptr; + + // type ID buffer + ARROW_ASSIGN_OR_RAISE(out_->buffers[1], AllocateBuffer(length_, pool_)); + std::memset(out_->buffers[1]->mutable_data(), type.type_codes()[0], length_); + + // For sparse unions, we now create children with the same length as the + // parent + int64_t child_length = length_; + if (type.mode() == UnionMode::DENSE) { + // For dense unions, we set the offsets to all zero and create children + // with length 1 + out_->buffers.resize(3); + ARROW_ASSIGN_OR_RAISE(out_->buffers[2], CreateZeroByteBuffer(sizeof(int32_t))); + + child_length = 1; + } + for (int i = 0; i < type_->num_fields(); ++i) { + ARROW_ASSIGN_OR_RAISE(out_->child_data[i], CreateChild(type, i, child_length)); + } + return Status::OK(); + } + + Status Visit(const DictionaryType& type) { + out_->buffers.resize(2); + // dictionary indices + ARROW_ASSIGN_OR_RAISE(out_->buffers[1], CreateZeroBitBuffer(type.bit_width())); + // dictionary data + ARROW_ASSIGN_OR_RAISE(auto typed_null_dict, MakeArrayOfNull(type.value_type(), 0)); + out_->dictionary = typed_null_dict->data(); + return Status::OK(); + } + + Status Visit(const ExtensionType& type) { + out_->child_data.resize(type.storage_type()->num_fields()); + RETURN_NOT_OK(VisitTypeInline(*type.storage_type(), this)); + return Status::OK(); + } + + Status Visit(const DataType& type) { + return Status::NotImplemented("construction of all-null ", type); + } + + Result> CreateChild(const DataType& type, int i, + int64_t length) { + ImmutableNullArrayFactory child_factory(pool_, type.field(i)->type(), length); + return child_factory.Create(); + } + + MemoryPool* pool_; + std::shared_ptr type_; + int64_t length_; + std::shared_ptr out_; +}; + class RepeatedArrayFactory { public: RepeatedArrayFactory(MemoryPool* pool, const Scalar& scalar, int64_t length) @@ -782,6 +926,13 @@ Result> MakeArrayOfNull(const std::shared_ptr& return MakeArray(data); } +Result> MakeImmutableArrayOfNull( + const std::shared_ptr& type, int64_t length, MemoryPool* pool) { + ARROW_ASSIGN_OR_RAISE(auto data, + ImmutableNullArrayFactory(pool, type, length).Create()); + return MakeArray(data); +} + Result> MakeArrayFromScalar(const Scalar& scalar, int64_t length, MemoryPool* pool) { // Null union scalars still have a type code associated @@ -791,6 +942,16 @@ Result> MakeArrayFromScalar(const Scalar& scalar, int64_t return RepeatedArrayFactory(pool, scalar, length).Create(); } +Result> MakeImmutableArrayFromScalar(const Scalar& scalar, + int64_t length, + MemoryPool* pool) { + // Null union scalars still have a type code associated + if (!scalar.is_valid && !is_union(scalar.type->id())) { + return MakeImmutableArrayOfNull(scalar.type, length, pool); + } + return RepeatedArrayFactory(pool, scalar, length).Create(); +} + Result> MakeEmptyArray(std::shared_ptr type, MemoryPool* memory_pool) { std::unique_ptr builder; diff --git a/cpp/src/arrow/array/util.h b/cpp/src/arrow/array/util.h index 6e6c61bd03d..894e8d556f4 100644 --- a/cpp/src/arrow/array/util.h +++ b/cpp/src/arrow/array/util.h @@ -37,16 +37,30 @@ namespace arrow { ARROW_EXPORT std::shared_ptr MakeArray(const std::shared_ptr& data); -/// \brief Create a strongly-typed Array instance with all elements null -/// \param[in] type the array type -/// \param[in] length the array length +/// \brief Create a strongly-typed mutable Array instance with all elements initially set +/// to null +/// \param[in] type the array type \param[in] length the array length /// \param[in] pool the memory pool to allocate memory from ARROW_EXPORT Result> MakeArrayOfNull(const std::shared_ptr& type, int64_t length, MemoryPool* pool = default_memory_pool()); -/// \brief Create an Array instance whose slots are the given scalar +/// \brief Create a strongly-typed immutable Array instance with all elements null +/// +/// This function may reuse a single zero buffer, but may also defer to +/// MakeArrayOfNull(). +/// +/// \param[in] type the array type +/// \param[in] length the array length +/// \param[in] pool the memory pool to allocate memory from +ARROW_EXPORT +Result> MakeImmutableArrayOfNull( + const std::shared_ptr& type, int64_t length, + MemoryPool* pool = default_memory_pool()); + +/// \brief Create a mutable Array instance whose slots are initialized with the given +/// scalar /// \param[in] scalar the value with which to fill the array /// \param[in] length the array length /// \param[in] pool the memory pool to allocate memory from @@ -54,6 +68,18 @@ ARROW_EXPORT Result> MakeArrayFromScalar( const Scalar& scalar, int64_t length, MemoryPool* pool = default_memory_pool()); +/// \brief Create an immutable Array instance whose slots are set to the given scalar +/// +/// This function may reuse buffers if they contain the same (repeated) value to save +/// memory, but may also defer to MakeArrayFromScalar(). +/// +/// \param[in] scalar the value with which to fill the array +/// \param[in] length the array length +/// \param[in] pool the memory pool to allocate memory from +ARROW_EXPORT +Result> MakeImmutableArrayFromScalar( + const Scalar& scalar, int64_t length, MemoryPool* pool = default_memory_pool()); + /// \brief Create an empty Array of a given type /// /// The output Array will be of the given type. From 4db13974e15af379b7d681b07eb6dea1fc402398 Mon Sep 17 00:00:00 2001 From: Jeroen van Straten Date: Wed, 19 Jan 2022 15:50:54 +0100 Subject: [PATCH 04/25] Fix one instance of mutable access to result of MakeArrayOfNulls() --- cpp/src/arrow/array/array_test.cc | 14 ++++++------ cpp/src/arrow/array/util.cc | 22 +++++++++---------- cpp/src/arrow/array/util.h | 16 +++++++------- .../arrow/compute/kernels/util_internal.cc | 2 +- 4 files changed, 27 insertions(+), 27 deletions(-) diff --git a/cpp/src/arrow/array/array_test.cc b/cpp/src/arrow/array/array_test.cc index 437ac6f8896..9e11721784a 100644 --- a/cpp/src/arrow/array/array_test.cc +++ b/cpp/src/arrow/array/array_test.cc @@ -368,7 +368,7 @@ TEST_F(TestArray, TestMakeArrayOfNull) { // clang-format on }; - for (auto fun : {MakeArrayOfNull, MakeImmutableArrayOfNull}) { + for (auto fun : {MakeArrayOfNull, MakeMutableArrayOfNull}) { for (int64_t length : {0, 1, 16, 133}) { for (auto type : types) { ARROW_SCOPED_TRACE("type = ", type->ToString()); @@ -399,7 +399,7 @@ TEST_F(TestArray, TestMakeArrayOfNullUnion) { // Unions need special checking -- the top level null count is 0 (per // ARROW-9222) so we check the first child to make sure is contains all nulls // and check that the type_ids all point to the first child - for (auto fun : {MakeArrayOfNull, MakeImmutableArrayOfNull}) { + for (auto fun : {MakeArrayOfNull, MakeMutableArrayOfNull}) { const int64_t union_length = 10; auto s_union_ty = sparse_union({field("a", utf8()), field("b", int32())}, {0, 1}); ASSERT_OK_AND_ASSIGN(auto s_union_nulls, @@ -586,7 +586,7 @@ static ScalarVector GetScalars() { } TEST_F(TestArray, TestMakeArrayFromScalar) { - for (auto fun : {MakeArrayFromScalar, MakeImmutableArrayFromScalar}) { + for (auto fun : {MakeArrayFromScalar, MakeMutableArrayFromScalar}) { ASSERT_OK_AND_ASSIGN(auto null_array, fun(NullScalar(), 5, default_memory_pool())); ASSERT_OK(null_array->ValidateFull()); ASSERT_EQ(null_array->length(), 5); @@ -619,7 +619,7 @@ TEST_F(TestArray, TestMakeArrayFromScalarSliced) { // Regression test for ARROW-13437 auto scalars = GetScalars(); - for (auto fun : {MakeArrayFromScalar, MakeImmutableArrayFromScalar}) { + for (auto fun : {MakeArrayFromScalar, MakeMutableArrayFromScalar}) { for (auto scalar : scalars) { SCOPED_TRACE(scalar->type->ToString()); ASSERT_OK_AND_ASSIGN(auto array, fun(*scalar, 32, default_memory_pool())); @@ -637,7 +637,7 @@ TEST_F(TestArray, TestMakeArrayFromDictionaryScalar) { ASSERT_OK_AND_ASSIGN(auto value, MakeScalar(int8(), 1)); auto scalar = DictionaryScalar({value, dictionary}, type); - for (auto fun : {MakeArrayFromScalar, MakeImmutableArrayFromScalar}) { + for (auto fun : {MakeArrayFromScalar, MakeMutableArrayFromScalar}) { ASSERT_OK_AND_ASSIGN(auto array, fun(scalar, 4, default_memory_pool())); ASSERT_OK(array->ValidateFull()); ASSERT_EQ(array->length(), 4); @@ -656,7 +656,7 @@ TEST_F(TestArray, TestMakeArrayFromMapScalar) { R"([{"key": "a", "value": 1}, {"key": "b", "value": 2}])"); auto scalar = MapScalar(value); - for (auto fun : {MakeArrayFromScalar, MakeImmutableArrayFromScalar}) { + for (auto fun : {MakeArrayFromScalar, MakeMutableArrayFromScalar}) { ASSERT_OK_AND_ASSIGN(auto array, fun(scalar, 11, default_memory_pool())); ASSERT_OK(array->ValidateFull()); ASSERT_EQ(array->length(), 11); @@ -667,7 +667,7 @@ TEST_F(TestArray, TestMakeArrayFromMapScalar) { ASSERT_TRUE(item->Equals(scalar)); } - if (fun != MakeImmutableArrayFromScalar) { + if (fun != MakeMutableArrayFromScalar) { AssertAppendScalar(pool_, std::make_shared(scalar)); } } diff --git a/cpp/src/arrow/array/util.cc b/cpp/src/arrow/array/util.cc index 36b9cee02ef..832a086314a 100644 --- a/cpp/src/arrow/array/util.cc +++ b/cpp/src/arrow/array/util.cc @@ -920,34 +920,34 @@ class RepeatedArrayFactory { } // namespace -Result> MakeArrayOfNull(const std::shared_ptr& type, - int64_t length, MemoryPool* pool) { +Result> MakeMutableArrayOfNull( + const std::shared_ptr& type, int64_t length, MemoryPool* pool) { ARROW_ASSIGN_OR_RAISE(auto data, NullArrayFactory(pool, type, length).Create()); return MakeArray(data); } -Result> MakeImmutableArrayOfNull( - const std::shared_ptr& type, int64_t length, MemoryPool* pool) { +Result> MakeArrayOfNull(const std::shared_ptr& type, + int64_t length, MemoryPool* pool) { ARROW_ASSIGN_OR_RAISE(auto data, ImmutableNullArrayFactory(pool, type, length).Create()); return MakeArray(data); } -Result> MakeArrayFromScalar(const Scalar& scalar, int64_t length, - MemoryPool* pool) { +Result> MakeMutableArrayFromScalar(const Scalar& scalar, + int64_t length, + MemoryPool* pool) { // Null union scalars still have a type code associated if (!scalar.is_valid && !is_union(scalar.type->id())) { - return MakeArrayOfNull(scalar.type, length, pool); + return MakeMutableArrayOfNull(scalar.type, length, pool); } return RepeatedArrayFactory(pool, scalar, length).Create(); } -Result> MakeImmutableArrayFromScalar(const Scalar& scalar, - int64_t length, - MemoryPool* pool) { +Result> MakeArrayFromScalar(const Scalar& scalar, int64_t length, + MemoryPool* pool) { // Null union scalars still have a type code associated if (!scalar.is_valid && !is_union(scalar.type->id())) { - return MakeImmutableArrayOfNull(scalar.type, length, pool); + return MakeArrayOfNull(scalar.type, length, pool); } return RepeatedArrayFactory(pool, scalar, length).Create(); } diff --git a/cpp/src/arrow/array/util.h b/cpp/src/arrow/array/util.h index 894e8d556f4..5d7e0122827 100644 --- a/cpp/src/arrow/array/util.h +++ b/cpp/src/arrow/array/util.h @@ -42,9 +42,9 @@ std::shared_ptr MakeArray(const std::shared_ptr& data); /// \param[in] type the array type \param[in] length the array length /// \param[in] pool the memory pool to allocate memory from ARROW_EXPORT -Result> MakeArrayOfNull(const std::shared_ptr& type, - int64_t length, - MemoryPool* pool = default_memory_pool()); +Result> MakeMutableArrayOfNull( + const std::shared_ptr& type, int64_t length, + MemoryPool* pool = default_memory_pool()); /// \brief Create a strongly-typed immutable Array instance with all elements null /// @@ -55,9 +55,9 @@ Result> MakeArrayOfNull(const std::shared_ptr& /// \param[in] length the array length /// \param[in] pool the memory pool to allocate memory from ARROW_EXPORT -Result> MakeImmutableArrayOfNull( - const std::shared_ptr& type, int64_t length, - MemoryPool* pool = default_memory_pool()); +Result> MakeArrayOfNull(const std::shared_ptr& type, + int64_t length, + MemoryPool* pool = default_memory_pool()); /// \brief Create a mutable Array instance whose slots are initialized with the given /// scalar @@ -65,7 +65,7 @@ Result> MakeImmutableArrayOfNull( /// \param[in] length the array length /// \param[in] pool the memory pool to allocate memory from ARROW_EXPORT -Result> MakeArrayFromScalar( +Result> MakeMutableArrayFromScalar( const Scalar& scalar, int64_t length, MemoryPool* pool = default_memory_pool()); /// \brief Create an immutable Array instance whose slots are set to the given scalar @@ -77,7 +77,7 @@ Result> MakeArrayFromScalar( /// \param[in] length the array length /// \param[in] pool the memory pool to allocate memory from ARROW_EXPORT -Result> MakeImmutableArrayFromScalar( +Result> MakeArrayFromScalar( const Scalar& scalar, int64_t length, MemoryPool* pool = default_memory_pool()); /// \brief Create an empty Array of a given type diff --git a/cpp/src/arrow/compute/kernels/util_internal.cc b/cpp/src/arrow/compute/kernels/util_internal.cc index 846fa26baf2..7bd52b1074a 100644 --- a/cpp/src/arrow/compute/kernels/util_internal.cc +++ b/cpp/src/arrow/compute/kernels/util_internal.cc @@ -70,7 +70,7 @@ ArrayKernelExec TrivialScalarUnaryAsArraysExec(ArrayKernelExec exec, } ARROW_ASSIGN_OR_RAISE(Datum array_in, MakeArrayFromScalar(*batch[0].scalar(), 1)); - ARROW_ASSIGN_OR_RAISE(Datum array_out, MakeArrayFromScalar(*out->scalar(), 1)); + ARROW_ASSIGN_OR_RAISE(Datum array_out, MakeMutableArrayFromScalar(*out->scalar(), 1)); RETURN_NOT_OK(exec(ctx, ExecBatch{{std::move(array_in)}, 1}, &array_out)); ARROW_ASSIGN_OR_RAISE(*out, array_out.make_array()->GetScalar(0)); return Status::OK(); From fc5fac9d7fc10692833ef4afe54d4fa5819fbc5c Mon Sep 17 00:00:00 2001 From: Jeroen van Straten Date: Wed, 19 Jan 2022 18:30:29 +0100 Subject: [PATCH 05/25] Fix compilation on some compilers, clean up slightly --- cpp/src/arrow/array/util.cc | 42 +++++++++++++++++++------------------ 1 file changed, 22 insertions(+), 20 deletions(-) diff --git a/cpp/src/arrow/array/util.cc b/cpp/src/arrow/array/util.cc index 832a086314a..099dcb272f8 100644 --- a/cpp/src/arrow/array/util.cc +++ b/cpp/src/arrow/array/util.cc @@ -535,41 +535,43 @@ class ImmutableNullArrayFactory { // mutable version of ImmutableNullArrayFactory class NullArrayFactory { - public: - NullArrayFactory(MemoryPool* pool, const std::shared_ptr& type, - int64_t length) - : pool_(pool), type_(type), length_(length) {} - - Result> Create() { - std::vector> child_data(type_->num_fields()); - ARROW_ASSIGN_OR_RAISE(auto validity, CreateZeroBitBuffer(1)); - out_ = ArrayData::Make(type_, length_, {validity}, child_data, length_, 0); - RETURN_NOT_OK(VisitTypeInline(*type_, this)); - return out_; - } - - Result> CreateZeroByteBuffer(size_t scalar_size_bytes) { + private: + Result> CreateZeroByteBuffer(size_t scalar_size_bytes) const { ARROW_ASSIGN_OR_RAISE(auto buffer, AllocateBuffer(length_ * scalar_size_bytes, pool_)); std::memset(buffer->mutable_data(), 0, buffer->size()); - return buffer; + return std::shared_ptr(std::move(buffer)); + ; } - Result> CreateZeroOffsetBuffer(size_t index_size_bytes) { + Result> CreateZeroOffsetBuffer(size_t index_size_bytes) const { ARROW_ASSIGN_OR_RAISE(auto buffer, AllocateBuffer((length_ + 1) * index_size_bytes, pool_)); std::memset(buffer->mutable_data(), 0, buffer->size()); - return buffer; + return std::shared_ptr(std::move(buffer)); } - Result> CreateZeroBitBuffer(size_t scalar_size_bits) { + Result> CreateZeroBitBuffer(size_t scalar_size_bits) const { ARROW_ASSIGN_OR_RAISE(auto buffer, AllocateBuffer((length_ * scalar_size_bits + 7) / 8, pool_)); std::memset(buffer->mutable_data(), 0, buffer->size()); - return buffer; + return std::shared_ptr(std::move(buffer)); } - Result> CreateEmptyBuffer() { return AllocateBuffer(0); } + static Result> CreateEmptyBuffer() { return AllocateBuffer(0); } + + public: + NullArrayFactory(MemoryPool* pool, const std::shared_ptr& type, + int64_t length) + : pool_(pool), type_(type), length_(length) {} + + Result> Create() { + std::vector> child_data(type_->num_fields()); + ARROW_ASSIGN_OR_RAISE(auto validity, CreateZeroBitBuffer(1)); + out_ = ArrayData::Make(type_, length_, {validity}, child_data, length_, 0); + RETURN_NOT_OK(VisitTypeInline(*type_, this)); + return out_; + } Status Visit(const NullType&) { out_->buffers.resize(1, nullptr); From d181c1d8e447e7447e90f3a6aa821fe5822f9123 Mon Sep 17 00:00:00 2001 From: Jeroen van Straten Date: Wed, 19 Jan 2022 19:11:17 +0100 Subject: [PATCH 06/25] Fix missing pointer dereference --- cpp/src/arrow/memory_pool.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/memory_pool.cc b/cpp/src/arrow/memory_pool.cc index dc973d0af7d..ace7a846405 100644 --- a/cpp/src/arrow/memory_pool.cc +++ b/cpp/src/arrow/memory_pool.cc @@ -623,7 +623,7 @@ class BaseMemoryPoolImpl : public MemoryPool { } #endif RETURN_NOT_OK(Allocate(size, out)); - std::memset(out, 0, size); + std::memset(*out, 0, size); return Status::OK(); } From 2d5064c05464a370555f3117b2e51ae5c32fff0d Mon Sep 17 00:00:00 2001 From: Jeroen van Straten Date: Wed, 19 Jan 2022 19:13:57 +0100 Subject: [PATCH 07/25] Refactor NULLPTR to nullptr in .cc files --- cpp/src/arrow/memory_pool.cc | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/cpp/src/arrow/memory_pool.cc b/cpp/src/arrow/memory_pool.cc index ace7a846405..0cfdaac4794 100644 --- a/cpp/src/arrow/memory_pool.cc +++ b/cpp/src/arrow/memory_pool.cc @@ -613,7 +613,7 @@ class BaseMemoryPoolImpl : public MemoryPool { #ifdef ARROW_MMAP_FOR_IMMUTABLE_ZEROS if (size > 0) { *out = static_cast(mmap( - NULLPTR, size, PROT_READ, MAP_PRIVATE | MAP_ANONYMOUS | MAP_NORESERVE, -1, 0)); + nullptr, size, PROT_READ, MAP_PRIVATE | MAP_ANONYMOUS | MAP_NORESERVE, -1, 0)); if (*out == MAP_FAILED) { auto err = errno; return Status::OutOfMemory("Failed to allocate zero buffer of size ", size, ": ", @@ -663,15 +663,15 @@ class BaseMemoryPoolImpl : public MemoryPool { // calls with slightly larger sizes. Fall back to the requested size if // this fails. if (!current_buffer || current_buffer->size() < size) { - uint8_t* data = NULLPTR; + uint8_t* data = nullptr; int64_t alloc_size; if (current_buffer && size < current_buffer->size() * 2) { alloc_size = current_buffer->size() * 2; if (!AllocateImmutableZeros(alloc_size, &data).ok()) { - data = NULLPTR; + data = nullptr; } } - if (data == NULLPTR) { + if (data == nullptr) { alloc_size = size; RETURN_NOT_OK(AllocateImmutableZeros(alloc_size, &data)); } From 700f5c907701674783ae189e1ec0fdf35e633173 Mon Sep 17 00:00:00 2001 From: Jeroen van Straten Date: Wed, 19 Jan 2022 19:20:41 +0100 Subject: [PATCH 08/25] Allow mmap-based optimization for immutable zeros to be disabled using a macro --- cpp/src/arrow/memory_pool.cc | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/cpp/src/arrow/memory_pool.cc b/cpp/src/arrow/memory_pool.cc index 0cfdaac4794..3b1183e1ae8 100644 --- a/cpp/src/arrow/memory_pool.cc +++ b/cpp/src/arrow/memory_pool.cc @@ -30,10 +30,12 @@ #include #endif +#ifndef ARROW_DISABLE_MMAP_FOR_IMMUTABLE_ZEROS #ifdef __linux__ -#define ARROW_MMAP_FOR_IMMUTABLE_ZEROS +#define USE_MMAP_FOR_IMMUTABLE_ZEROS #include #endif +#endif #include "arrow/buffer.h" #include "arrow/io/util_internal.h" @@ -610,7 +612,7 @@ class BaseMemoryPoolImpl : public MemoryPool { protected: virtual Status AllocateImmutableZeros(int64_t size, uint8_t** out) { -#ifdef ARROW_MMAP_FOR_IMMUTABLE_ZEROS +#ifdef USE_MMAP_FOR_IMMUTABLE_ZEROS if (size > 0) { *out = static_cast(mmap( nullptr, size, PROT_READ, MAP_PRIVATE | MAP_ANONYMOUS | MAP_NORESERVE, -1, 0)); @@ -628,7 +630,7 @@ class BaseMemoryPoolImpl : public MemoryPool { } void FreeImmutableZeros(uint8_t* buffer, int64_t size) override { -#ifdef ARROW_MMAP_FOR_IMMUTABLE_ZEROS +#ifdef USE_MMAP_FOR_IMMUTABLE_ZEROS if (size > 0) { munmap(buffer, size); return; From 5dc66effa3263bd2bba1560aea4a200f1b806df3 Mon Sep 17 00:00:00 2001 From: Jeroen van Straten Date: Wed, 19 Jan 2022 21:27:42 +0100 Subject: [PATCH 09/25] Fix typo/lint violation --- cpp/src/arrow/array/util.cc | 1 - 1 file changed, 1 deletion(-) diff --git a/cpp/src/arrow/array/util.cc b/cpp/src/arrow/array/util.cc index 099dcb272f8..aa6515788b9 100644 --- a/cpp/src/arrow/array/util.cc +++ b/cpp/src/arrow/array/util.cc @@ -541,7 +541,6 @@ class NullArrayFactory { AllocateBuffer(length_ * scalar_size_bytes, pool_)); std::memset(buffer->mutable_data(), 0, buffer->size()); return std::shared_ptr(std::move(buffer)); - ; } Result> CreateZeroOffsetBuffer(size_t index_size_bytes) const { From bd3810c638c26c5c1d8145724509d1ff17e8ac2f Mon Sep 17 00:00:00 2001 From: Jeroen van Straten Date: Wed, 9 Feb 2022 16:43:01 +0100 Subject: [PATCH 10/25] Fix docstrings to align with earlier refactor --- cpp/src/arrow/array/util.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/array/util.h b/cpp/src/arrow/array/util.h index 5d7e0122827..f32d28f5859 100644 --- a/cpp/src/arrow/array/util.h +++ b/cpp/src/arrow/array/util.h @@ -49,7 +49,7 @@ Result> MakeMutableArrayOfNull( /// \brief Create a strongly-typed immutable Array instance with all elements null /// /// This function may reuse a single zero buffer, but may also defer to -/// MakeArrayOfNull(). +/// MakeMutableArrayOfNull(). /// /// \param[in] type the array type /// \param[in] length the array length @@ -71,7 +71,7 @@ Result> MakeMutableArrayFromScalar( /// \brief Create an immutable Array instance whose slots are set to the given scalar /// /// This function may reuse buffers if they contain the same (repeated) value to save -/// memory, but may also defer to MakeArrayFromScalar(). +/// memory, but may also defer to MakeMutableArrayFromScalar(). /// /// \param[in] scalar the value with which to fill the array /// \param[in] length the array length From 498d12fd99dc62d4e7713699f5b8b8667a3c21f1 Mon Sep 17 00:00:00 2001 From: Jeroen van Straten Date: Wed, 9 Feb 2022 16:47:14 +0100 Subject: [PATCH 11/25] Refactor @param to \param --- cpp/src/arrow/memory_pool.h | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/cpp/src/arrow/memory_pool.h b/cpp/src/arrow/memory_pool.h index 0815d63843b..47db8595d6e 100644 --- a/cpp/src/arrow/memory_pool.h +++ b/cpp/src/arrow/memory_pool.h @@ -117,8 +117,8 @@ class ARROW_EXPORT MemoryPool { /// Free an allocated region. /// - /// @param buffer Pointer to the start of the allocated memory region - /// @param size Allocated size located at buffer. An allocator implementation + /// \param buffer Pointer to the start of the allocated memory region + /// \param size Allocated size located at buffer. An allocator implementation /// may use this for tracking the amount of allocated bytes as well as for /// faster deallocation if supported by its backend. virtual void Free(uint8_t* buffer, int64_t size) = 0; @@ -160,8 +160,8 @@ class ARROW_EXPORT MemoryPool { /// Free a memory region allocated by GetImmutableZeros(). /// - /// @param buffer Pointer to the start of the allocated memory region - /// @param size Allocated size located at buffer. An allocator implementation + /// \param buffer Pointer to the start of the allocated memory region + /// \param size Allocated size located at buffer. An allocator implementation /// may use this for tracking the amount of allocated bytes as well as for /// faster deallocation if supported by its backend. virtual void FreeImmutableZeros(uint8_t* buffer, int64_t size); From 9d055d1579027120ee29a7a2779066f46fc147f0 Mon Sep 17 00:00:00 2001 From: Jeroen van Straten Date: Wed, 9 Feb 2022 16:52:45 +0100 Subject: [PATCH 12/25] Use BytesForBits utility function --- cpp/src/arrow/array/util.cc | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/array/util.cc b/cpp/src/arrow/array/util.cc index aa6515788b9..5956c27dff5 100644 --- a/cpp/src/arrow/array/util.cc +++ b/cpp/src/arrow/array/util.cc @@ -551,8 +551,9 @@ class NullArrayFactory { } Result> CreateZeroBitBuffer(size_t scalar_size_bits) const { - ARROW_ASSIGN_OR_RAISE(auto buffer, - AllocateBuffer((length_ * scalar_size_bits + 7) / 8, pool_)); + ARROW_ASSIGN_OR_RAISE( + auto buffer, + AllocateBuffer(bit_util::BytesForBits(length_ * scalar_size_bits), pool_)); std::memset(buffer->mutable_data(), 0, buffer->size()); return std::shared_ptr(std::move(buffer)); } From 7ce1bc6012ba21d73bf271182157de720a2b5d1c Mon Sep 17 00:00:00 2001 From: Jeroen van Straten Date: Wed, 9 Feb 2022 17:06:29 +0100 Subject: [PATCH 13/25] Fix MakeMutableArrayOfNull not being fully mutable for nested types --- cpp/src/arrow/array/util.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/array/util.cc b/cpp/src/arrow/array/util.cc index 5956c27dff5..7b3876e182f 100644 --- a/cpp/src/arrow/array/util.cc +++ b/cpp/src/arrow/array/util.cc @@ -669,7 +669,7 @@ class NullArrayFactory { Result> CreateChild(const DataType& type, int i, int64_t length) { - ImmutableNullArrayFactory child_factory(pool_, type.field(i)->type(), length); + NullArrayFactory child_factory(pool_, type.field(i)->type(), length); return child_factory.Create(); } From 905610625e95d273e259c581709c434f6d19c421 Mon Sep 17 00:00:00 2001 From: Jeroen van Straten Date: Wed, 9 Feb 2022 17:16:09 +0100 Subject: [PATCH 14/25] Refactor ImmutableNullArrayFactory to NullArrayFactory and NullArrayFactory to MutableNullArrayFactory for consistency with public functions --- cpp/src/arrow/array/util.cc | 23 +++++++++++------------ 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/cpp/src/arrow/array/util.cc b/cpp/src/arrow/array/util.cc index 7b3876e182f..9582befa24b 100644 --- a/cpp/src/arrow/array/util.cc +++ b/cpp/src/arrow/array/util.cc @@ -319,7 +319,7 @@ namespace { // get the maximum buffer length required, then allocate a single zeroed buffer // to use anywhere a buffer is required -class ImmutableNullArrayFactory { +class NullArrayFactory { public: struct GetBufferLength { GetBufferLength(const std::shared_ptr& type, int64_t length) @@ -415,8 +415,8 @@ class ImmutableNullArrayFactory { int64_t length_, buffer_length_; }; - ImmutableNullArrayFactory(MemoryPool* pool, const std::shared_ptr& type, - int64_t length) + NullArrayFactory(MemoryPool* pool, const std::shared_ptr& type, + int64_t length) : pool_(pool), type_(type), length_(length) {} Status CreateBuffer() { @@ -521,7 +521,7 @@ class ImmutableNullArrayFactory { Result> CreateChild(const DataType& type, int i, int64_t length) { - ImmutableNullArrayFactory child_factory(pool_, type.field(i)->type(), length); + NullArrayFactory child_factory(pool_, type.field(i)->type(), length); child_factory.buffer_ = buffer_; return child_factory.Create(); } @@ -533,8 +533,8 @@ class ImmutableNullArrayFactory { std::shared_ptr buffer_; }; -// mutable version of ImmutableNullArrayFactory -class NullArrayFactory { +// mutable version of NullArrayFactory, i.e. one that doesn't reuse a single buffer +class MutableNullArrayFactory { private: Result> CreateZeroByteBuffer(size_t scalar_size_bytes) const { ARROW_ASSIGN_OR_RAISE(auto buffer, @@ -561,8 +561,8 @@ class NullArrayFactory { static Result> CreateEmptyBuffer() { return AllocateBuffer(0); } public: - NullArrayFactory(MemoryPool* pool, const std::shared_ptr& type, - int64_t length) + MutableNullArrayFactory(MemoryPool* pool, const std::shared_ptr& type, + int64_t length) : pool_(pool), type_(type), length_(length) {} Result> Create() { @@ -669,7 +669,7 @@ class NullArrayFactory { Result> CreateChild(const DataType& type, int i, int64_t length) { - NullArrayFactory child_factory(pool_, type.field(i)->type(), length); + MutableNullArrayFactory child_factory(pool_, type.field(i)->type(), length); return child_factory.Create(); } @@ -924,14 +924,13 @@ class RepeatedArrayFactory { Result> MakeMutableArrayOfNull( const std::shared_ptr& type, int64_t length, MemoryPool* pool) { - ARROW_ASSIGN_OR_RAISE(auto data, NullArrayFactory(pool, type, length).Create()); + ARROW_ASSIGN_OR_RAISE(auto data, MutableNullArrayFactory(pool, type, length).Create()); return MakeArray(data); } Result> MakeArrayOfNull(const std::shared_ptr& type, int64_t length, MemoryPool* pool) { - ARROW_ASSIGN_OR_RAISE(auto data, - ImmutableNullArrayFactory(pool, type, length).Create()); + ARROW_ASSIGN_OR_RAISE(auto data, NullArrayFactory(pool, type, length).Create()); return MakeArray(data); } From 81aa0786818d1a5b12035dec4e390b33f0bb70aa Mon Sep 17 00:00:00 2001 From: Jeroen van Straten Date: Thu, 10 Feb 2022 14:12:51 +0100 Subject: [PATCH 15/25] Hide ImmutableZeros class using Buffer interface, extend abstractions to MemoryManager interface --- cpp/src/arrow/device.cc | 79 ++++++++++++++++++++++++++++++++++-- cpp/src/arrow/device.h | 62 ++++++++++++++++++++++++++++ cpp/src/arrow/memory_pool.cc | 54 +++++++++++++++++++----- cpp/src/arrow/memory_pool.h | 64 +++++++++-------------------- 4 files changed, 201 insertions(+), 58 deletions(-) diff --git a/cpp/src/arrow/device.cc b/cpp/src/arrow/device.cc index d042e3445c5..195b46e08a5 100644 --- a/cpp/src/arrow/device.cc +++ b/cpp/src/arrow/device.cc @@ -43,6 +43,8 @@ Device::~Device() {} return maybe_buffer; \ } +bool MemoryManager::is_mutable() const { return true; } + Result> MemoryManager::CopyBuffer( const std::shared_ptr& buf, const std::shared_ptr& to) { const auto& from = buf->memory_manager(); @@ -161,10 +163,10 @@ Result> CPUMemoryManager::ViewBufferFrom( Result> CPUMemoryManager::CopyBufferTo( const std::shared_ptr& buf, const std::shared_ptr& to) { - if (!to->is_cpu()) { + if (!to->is_cpu() || !to->is_mutable()) { return nullptr; } - ARROW_ASSIGN_OR_RAISE(auto dest, ::arrow::AllocateBuffer(buf->size(), pool_)); + ARROW_ASSIGN_OR_RAISE(auto dest, to->AllocateBuffer(buf->size())); if (buf->size() > 0) { memcpy(dest->mutable_data(), buf->data(), static_cast(buf->size())); } @@ -173,7 +175,7 @@ Result> CPUMemoryManager::CopyBufferTo( Result> CPUMemoryManager::ViewBufferTo( const std::shared_ptr& buf, const std::shared_ptr& to) { - if (!to->is_cpu()) { + if (!to->is_cpu() || !to->is_mutable()) { return nullptr; } return buf; @@ -185,6 +187,72 @@ std::shared_ptr default_cpu_memory_manager() { return instance; } +std::shared_ptr CPUImmutableZerosMemoryManager::Make( + const std::shared_ptr& device, MemoryPool* pool) { + return std::shared_ptr(new CPUImmutableZerosMemoryManager(device, pool)); +} + +Result> +CPUImmutableZerosMemoryManager::GetBufferReader(std::shared_ptr buf) { + return std::make_shared(std::move(buf)); +} + +Result> CPUImmutableZerosMemoryManager::GetBufferWriter( + std::shared_ptr buf) { + return Status::NotImplemented("cannot write to immutable zero buffers"); +} + +Result> CPUImmutableZerosMemoryManager::AllocateBuffer( + int64_t size) { + return ::arrow::MakeBufferOfZeros(size, pool_); +} + +Result> CPUImmutableZerosMemoryManager::CopyBufferFrom( + const std::shared_ptr& buf, const std::shared_ptr& from) { + // Cannot write to immutable zero buffers + return nullptr; +} + +Result> CPUImmutableZerosMemoryManager::ViewBufferFrom( + const std::shared_ptr& buf, const std::shared_ptr& from) { + // Can only guarantee source buffer satisfies our conditions if we're also the + // source memory manager + if (from.get() != this) { + return nullptr; + } + return buf; +} + +Result> CPUImmutableZerosMemoryManager::CopyBufferTo( + const std::shared_ptr& buf, const std::shared_ptr& to) { + if (!to->is_cpu() || !to->is_mutable()) { + return nullptr; + } + ARROW_ASSIGN_OR_RAISE(auto dest, to->AllocateBuffer(buf->size())); + if (buf->size() > 0) { + // Note: we can memset to zero instead of doing a memcpy, because we already + // know that the source buffer is all zeros + memset(dest->mutable_data(), 0, static_cast(buf->size())); + } + return std::move(dest); +} + +Result> CPUImmutableZerosMemoryManager::ViewBufferTo( + const std::shared_ptr& buf, const std::shared_ptr& to) { + // Can only view this buffer if destination memory manager is either mutable or + // generates buffers with the same immutable pattern that we do + if (!to->is_cpu() || !(to->is_mutable() || to.get() == this)) { + return nullptr; + } + return buf; +} + +std::shared_ptr default_cpu_immutable_zeros_memory_manager() { + static auto instance = + CPUImmutableZerosMemoryManager::Make(CPUDevice::Instance(), default_memory_pool()); + return instance; +} + std::shared_ptr CPUDevice::Instance() { static auto instance = std::shared_ptr(new CPUDevice()); return instance; @@ -202,6 +270,11 @@ std::shared_ptr CPUDevice::memory_manager(MemoryPool* pool) { return CPUMemoryManager::Make(Instance(), pool); } +std::shared_ptr CPUDevice::immutable_zeros_memory_manager( + MemoryPool* pool) { + return CPUImmutableZerosMemoryManager::Make(Instance(), pool); +} + std::shared_ptr CPUDevice::default_memory_manager() { return default_cpu_memory_manager(); } diff --git a/cpp/src/arrow/device.h b/cpp/src/arrow/device.h index 373a8a5db1f..e6ca350eee9 100644 --- a/cpp/src/arrow/device.h +++ b/cpp/src/arrow/device.h @@ -96,6 +96,14 @@ class ARROW_EXPORT MemoryManager : public std::enable_shared_from_thisis_cpu(); } + /// \brief Whether buffers managed by this MemoryManager are mutable. + /// + /// When this returns false, the contents of the buffers managed by this memory + /// manager by definition have some special significance, since only the manager + /// has control over what's in them. For example, this may be that the contents + /// are always zero, satisfy some pattern, etc. + virtual bool is_mutable() const; + /// \brief Create a RandomAccessFile to read a particular buffer. /// /// The given buffer must be tied to this MemoryManager. @@ -173,6 +181,11 @@ class ARROW_EXPORT CPUDevice : public Device { /// The returned MemoryManager will use the given MemoryPool for allocations. static std::shared_ptr memory_manager(MemoryPool* pool); + /// \brief Create a MemoryManager for immutable zero buffers + /// + /// The returned MemoryManager will use the given MemoryPool for allocations. + static std::shared_ptr immutable_zeros_memory_manager(MemoryPool* pool); + protected: CPUDevice() : Device(true) {} }; @@ -223,4 +236,53 @@ class ARROW_EXPORT CPUMemoryManager : public MemoryManager { ARROW_EXPORT std::shared_ptr default_cpu_memory_manager(); +/// A memory manager that uses the immutable zeros interface of the given memory pool, +/// rather than the normal mutable buffer interface. +class ARROW_EXPORT CPUImmutableZerosMemoryManager : public MemoryManager { + public: + Result> GetBufferReader( + std::shared_ptr buf) override; + Result> GetBufferWriter( + std::shared_ptr buf) override; + + Result> AllocateBuffer(int64_t size) override; + + /// \brief Return the MemoryPool associated with this MemoryManager. + MemoryPool* pool() const { return pool_; } + + protected: + CPUImmutableZerosMemoryManager(const std::shared_ptr& device, MemoryPool* pool) + : MemoryManager(device), pool_(pool) {} + + static std::shared_ptr Make(const std::shared_ptr& device, + MemoryPool* pool = default_memory_pool()); + + Result> CopyBufferFrom( + const std::shared_ptr& buf, + const std::shared_ptr& from) override; + Result> CopyBufferTo( + const std::shared_ptr& buf, + const std::shared_ptr& to) override; + Result> ViewBufferFrom( + const std::shared_ptr& buf, + const std::shared_ptr& from) override; + Result> ViewBufferTo( + const std::shared_ptr& buf, + const std::shared_ptr& to) override; + + MemoryPool* pool_; + + friend std::shared_ptr CPUDevice::immutable_zeros_memory_manager( + MemoryPool* pool); + friend ARROW_EXPORT std::shared_ptr + default_cpu_immutable_zeros_memory_manager(); +}; + +/// \brief Return the default CPU MemoryManager instance for allocating immutable blocks +/// of zeros +/// +/// The returned singleton instance uses the default MemoryPool. +ARROW_EXPORT +std::shared_ptr default_cpu_immutable_zeros_memory_manager(); + } // namespace arrow diff --git a/cpp/src/arrow/memory_pool.cc b/cpp/src/arrow/memory_pool.cc index 3b1183e1ae8..b4869c0d7c8 100644 --- a/cpp/src/arrow/memory_pool.cc +++ b/cpp/src/arrow/memory_pool.cc @@ -543,6 +543,39 @@ int64_t MemoryPool::max_memory() const { return -1; } // MemoryPool implementation that delegates its core duty // to an Allocator class. +class ImmutableZeros : public Buffer { + public: + explicit ImmutableZeros(uint8_t* data, int64_t size, MemoryPool* pool) + : Buffer(data, size, CPUDevice::immutable_zeros_memory_manager(pool)), + pool_(pool) {} + + ImmutableZeros() : Buffer(nullptr, 0), pool_(nullptr){}; + + ~ImmutableZeros() override; + + // Prevent copies and handle moves explicitly to avoid double free + ImmutableZeros(const ImmutableZeros&) = delete; + ImmutableZeros& operator=(const ImmutableZeros&) = delete; + + ImmutableZeros(ImmutableZeros&& other) noexcept + : Buffer(other.data_, other.size_, other.memory_manager()), pool_(other.pool_) { + other.pool_ = nullptr; + other.data_ = nullptr; + other.size_ = 0; + } + + ImmutableZeros& operator=(ImmutableZeros&& other) noexcept { + SetMemoryManager(other.memory_manager()); + std::swap(pool_, other.pool_); + std::swap(data_, other.data_); + std::swap(size_, other.size_); + return *this; + } + + private: + MemoryPool* pool_ = nullptr; +}; + #ifndef NDEBUG static constexpr uint8_t kAllocPoison = 0xBC; static constexpr uint8_t kReallocPoison = 0xBD; @@ -640,7 +673,7 @@ class BaseMemoryPoolImpl : public MemoryPool { } public: - Result> GetImmutableZeros(int64_t size) override { + Result> GetImmutableZeros(int64_t size) override { // Thread-safely get the current largest buffer of zeros. std::shared_ptr current_buffer; { @@ -821,7 +854,7 @@ static struct GlobalState { #endif } global_state; -MemoryPool::ImmutableZeros::~ImmutableZeros() { +ImmutableZeros::~ImmutableZeros() { // Avoid calling pool_->FreeImmutableZeros if the global pools are destroyed // (XXX this will not work with user-defined pools) @@ -829,12 +862,11 @@ MemoryPool::ImmutableZeros::~ImmutableZeros() { // after memory pools are destructed on the main thread (as there is // no guarantee of destructor order between thread/memory pools) if (data_ && !global_state.is_finalizing()) { - pool_->FreeImmutableZeros(data_, size_); + pool_->FreeImmutableZeros(const_cast(data_), size_); } } -Result> MemoryPool::GetImmutableZeros( - int64_t size) { +Result> MemoryPool::GetImmutableZeros(int64_t size) { uint8_t* data; RETURN_NOT_OK(Allocate(size, &data)); std::memset(data, 0, size); @@ -1092,8 +1124,8 @@ class PoolBuffer final : public ResizableBuffer { /// MemoryPool class ImmutableZerosPoolBuffer final : public Buffer { public: - explicit ImmutableZerosPoolBuffer(std::shared_ptr&& zeros, - int64_t size, std::shared_ptr&& mm) + explicit ImmutableZerosPoolBuffer(std::shared_ptr&& zeros, int64_t size, + std::shared_ptr&& mm) : Buffer(zeros->data(), size, std::move(mm)), zeros_(std::move(zeros)) {} static Result> Make(int64_t size, @@ -1101,9 +1133,9 @@ class ImmutableZerosPoolBuffer final : public Buffer { std::shared_ptr mm; if (pool == nullptr) { pool = default_memory_pool(); - mm = default_cpu_memory_manager(); + mm = default_cpu_immutable_zeros_memory_manager(); } else { - mm = CPUDevice::memory_manager(pool); + mm = CPUDevice::immutable_zeros_memory_manager(pool); } ARROW_ASSIGN_OR_RAISE(auto zeros, pool->GetImmutableZeros(size)); return std::unique_ptr( @@ -1111,7 +1143,9 @@ class ImmutableZerosPoolBuffer final : public Buffer { } private: - std::shared_ptr zeros_; + // Note: we don't use this value directly; however, it needs to be here to keep a + // reference to the shared_ptr to keep the underlying zero buffer alive. + std::shared_ptr zeros_; }; namespace { diff --git a/cpp/src/arrow/memory_pool.h b/cpp/src/arrow/memory_pool.h index 47db8595d6e..97f6568aa61 100644 --- a/cpp/src/arrow/memory_pool.h +++ b/cpp/src/arrow/memory_pool.h @@ -63,42 +63,6 @@ class MemoryPoolStats { /// take care of the required 64-byte alignment. class ARROW_EXPORT MemoryPool { public: - class ARROW_EXPORT ImmutableZeros { - public: - explicit ImmutableZeros(uint8_t* data, int64_t size, MemoryPool* pool) - : pool_(pool), data_(data), size_(size) {} - - ImmutableZeros() = default; - - ~ImmutableZeros(); - - // Prevent copies and handle moves explicitly to avoid double free - ImmutableZeros(const ImmutableZeros&) = delete; - ImmutableZeros& operator=(const ImmutableZeros&) = delete; - - ImmutableZeros(ImmutableZeros&& other) noexcept - : pool_(other.pool_), data_(other.data_), size_(other.size_) { - other.pool_ = NULLPTR; - other.data_ = NULLPTR; - other.size_ = 0; - } - - ImmutableZeros& operator=(ImmutableZeros&& other) noexcept { - std::swap(data_, other.data_); - std::swap(pool_, other.pool_); - std::swap(size_, other.size_); - return *this; - } - - const uint8_t* data() const { return data_; } - int64_t size() const { return size_; } - - private: - MemoryPool* pool_ = NULLPTR; - uint8_t* data_ = NULLPTR; - int64_t size_ = 0; - }; - virtual ~MemoryPool() = default; /// \brief EXPERIMENTAL. Create a new instance of the default MemoryPool @@ -123,17 +87,25 @@ class ARROW_EXPORT MemoryPool { /// faster deallocation if supported by its backend. virtual void Free(uint8_t* buffer, int64_t size) = 0; - /// Return a block of immutable zero bytes of at least the given size. + /// Return a shared block of immutable zero bytes of at least the given size. + /// + /// These blocks are useful when some other readable block of data is needed + /// to comply with an interface, but the contents of the block don't matter + /// and/or should just be zero. Unlike other allocations, the same underlying + /// block of memory can be shared between all users (for a particular size + /// limit), thus reducing memory footprint. In fact, some implementations do + /// not require allocation of physical memory at all, and instead rely on + /// MMU tricks to create arbitrarily large blocks of zeroed virtual memory. /// - /// These blocks are useful when a buffer, array, or some other readable block - /// of data is needed to comply with an interface, but the contents of the - /// block don't matter and/or can just be zero. Unlike other allocations, the - /// same underlying block of memory can be shared between all users (for a - /// particular size limit), thus reducing memory footprint. + /// The allocated region shall be 64-byte aligned. The region will be + /// deallocated automatically when all shared_ptrs to the region are + /// destroyed. /// - /// The allocated region shall be 64-byte aligned. A region will be - /// deallocated when all shared_ptrs to the region are destroyed. - virtual Result> GetImmutableZeros(int64_t size); + /// While this returns a shared_ptr to a Buffer, you may want to use the + /// MakeBufferOfZeros() function, analogous to AllocateBuffer(). This + /// function returns a Buffer that advertises exactly the requested size, + /// and that also embeds/hides the shared ownership of the underlying buffer. + virtual Result> GetImmutableZeros(int64_t size); /// Return unused memory to the OS /// @@ -165,6 +137,8 @@ class ARROW_EXPORT MemoryPool { /// may use this for tracking the amount of allocated bytes as well as for /// faster deallocation if supported by its backend. virtual void FreeImmutableZeros(uint8_t* buffer, int64_t size); + + friend class ImmutableZeros; }; class ARROW_EXPORT LoggingMemoryPool : public MemoryPool { From 3a10fca40fc9123482353a1ecba4b408bc720bd7 Mon Sep 17 00:00:00 2001 From: Jeroen van Straten Date: Mon, 14 Feb 2022 11:48:55 +0100 Subject: [PATCH 16/25] Fix lint violation due to typo --- cpp/src/arrow/memory_pool.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/memory_pool.cc b/cpp/src/arrow/memory_pool.cc index b4869c0d7c8..68743981b71 100644 --- a/cpp/src/arrow/memory_pool.cc +++ b/cpp/src/arrow/memory_pool.cc @@ -549,7 +549,7 @@ class ImmutableZeros : public Buffer { : Buffer(data, size, CPUDevice::immutable_zeros_memory_manager(pool)), pool_(pool) {} - ImmutableZeros() : Buffer(nullptr, 0), pool_(nullptr){}; + ImmutableZeros() : Buffer(nullptr, 0), pool_(nullptr) {} ~ImmutableZeros() override; From a4c3413cfbd16f6b03c0451b9e199abb16f145c9 Mon Sep 17 00:00:00 2001 From: Jeroen van Straten Date: Mon, 14 Feb 2022 14:44:29 +0100 Subject: [PATCH 17/25] Fix possibly custom memory pool not used for creation of buffer --- cpp/src/arrow/array/util.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/array/util.cc b/cpp/src/arrow/array/util.cc index 9582befa24b..a81e2a69dea 100644 --- a/cpp/src/arrow/array/util.cc +++ b/cpp/src/arrow/array/util.cc @@ -558,7 +558,7 @@ class MutableNullArrayFactory { return std::shared_ptr(std::move(buffer)); } - static Result> CreateEmptyBuffer() { return AllocateBuffer(0); } + Result> CreateEmptyBuffer() { return AllocateBuffer(0, pool_); } public: MutableNullArrayFactory(MemoryPool* pool, const std::shared_ptr& type, From 41ebefc321bff0cfc1fdf6080f7e8a8d7ac665e7 Mon Sep 17 00:00:00 2001 From: Jeroen van Straten Date: Mon, 14 Feb 2022 15:09:33 +0100 Subject: [PATCH 18/25] Add missing is_mutable() override --- cpp/src/arrow/device.h | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cpp/src/arrow/device.h b/cpp/src/arrow/device.h index e6ca350eee9..4f6a1c37fbb 100644 --- a/cpp/src/arrow/device.h +++ b/cpp/src/arrow/device.h @@ -240,6 +240,8 @@ std::shared_ptr default_cpu_memory_manager(); /// rather than the normal mutable buffer interface. class ARROW_EXPORT CPUImmutableZerosMemoryManager : public MemoryManager { public: + bool is_mutable() const override { return false; }; + Result> GetBufferReader( std::shared_ptr buf) override; Result> GetBufferWriter( From e62868870712ba6fe934760413bc764868b99e13 Mon Sep 17 00:00:00 2001 From: Jeroen van Straten Date: Tue, 15 Feb 2022 11:25:09 +0100 Subject: [PATCH 19/25] Rewrite immutable zeros allocation to: - use atomic_load/atomic_store for shared access to the cache, rather than using a mutex - allocate at least 4096 bytes for the first allocation - be generally more readable --- cpp/src/arrow/memory_pool.cc | 95 +++++++++++++++++++----------------- 1 file changed, 50 insertions(+), 45 deletions(-) diff --git a/cpp/src/arrow/memory_pool.cc b/cpp/src/arrow/memory_pool.cc index 68743981b71..060a3b01f6a 100644 --- a/cpp/src/arrow/memory_pool.cc +++ b/cpp/src/arrow/memory_pool.cc @@ -675,51 +675,54 @@ class BaseMemoryPoolImpl : public MemoryPool { public: Result> GetImmutableZeros(int64_t size) override { // Thread-safely get the current largest buffer of zeros. - std::shared_ptr current_buffer; - { - std::lock_guard ga(immutable_zeros_access_mutex_); - current_buffer = immutable_zeros_cache_; + auto current_buffer = atomic_load(&immutable_zeros_cache_); + + // If this buffer satisfies the requirements, return it. + if (current_buffer && current_buffer->size() >= size) { + return std::move(current_buffer); } - // If no buffer exists yet or the current buffer isn't large enough, - // acquire the mutex for growing the buffer, then try again (another thread - // may have beat us to it and have grown the buffer already while we're - // waiting for the lock). - if (!current_buffer || current_buffer->size() < size) { - std::lock_guard gg(immutable_zeros_grow_mutex_); - { - std::lock_guard ga(immutable_zeros_access_mutex_); - current_buffer = immutable_zeros_cache_; - } + // Acquire the lock for allocating a new buffer. + std::lock_guard gg(immutable_zeros_mutex_); - // If the buffer is still not large enough, heuristically allocate a - // larger buffer. Try to allocate at least twice the size of the current - // buffer first, to prevent allocating lots of buffers for subsequent - // calls with slightly larger sizes. Fall back to the requested size if - // this fails. - if (!current_buffer || current_buffer->size() < size) { - uint8_t* data = nullptr; - int64_t alloc_size; - if (current_buffer && size < current_buffer->size() * 2) { - alloc_size = current_buffer->size() * 2; - if (!AllocateImmutableZeros(alloc_size, &data).ok()) { - data = nullptr; - } - } - if (data == nullptr) { - alloc_size = size; - RETURN_NOT_OK(AllocateImmutableZeros(alloc_size, &data)); - } - current_buffer = std::make_shared(data, alloc_size, this); - - // Save the new buffer for other threads to use. - { - std::lock_guard ga(immutable_zeros_access_mutex_); - immutable_zeros_cache_ = current_buffer; - } - } + // Between our previous atomic load and acquisition of the lock, another + // thread may have allocated a buffer. So we need to check again. + current_buffer = atomic_load(&immutable_zeros_cache_); + if (current_buffer && current_buffer->size() >= size) { + return std::move(current_buffer); } + // Let's now figure out a good size to allocate. This is done + // heuristically, with the following rules: + // - allocate at least the requested size (obviously); + // - allocate at least 2x the previous size; + // - allocate at least MIN_ALLOC_SIZE bytes (to avoid lots of small + // allocations). + static const int64_t MIN_ALLOC_SIZE = 4096; + int64_t alloc_size = + std::max(size, current_buffer ? (current_buffer->size() * 2) : MIN_ALLOC_SIZE); + + // Attempt to allocate the block. + uint8_t* data = nullptr; + auto result = AllocateImmutableZeros(alloc_size, &data); + + // If we fail to do so, fall back to trying to allocate the requested size + // exactly as a last-ditch effort. + if (!result.ok() || data == nullptr) { + alloc_size = size; + RETURN_NOT_OK(AllocateImmutableZeros(alloc_size, &data)); + } + DCHECK_NE(data, nullptr); + + // Move ownership of the data block into an ImmutableZeros object. It will + // free the block when destroyed, i.e. when all shared_ptr references to it + // are reset or go out of scope. + current_buffer = std::make_shared(data, alloc_size, this); + + // Store a reference to the new block in the cache, so subsequent calls to + // this function (from this thread or from other threads) can use it, too. + atomic_store(&immutable_zeros_cache_, current_buffer); + return std::move(current_buffer); } @@ -728,9 +731,12 @@ class BaseMemoryPoolImpl : public MemoryPool { // there are other pieces of code using it, getting rid of the cache won't // deallocate it anyway, so it's better to hold onto it. { - std::lock_guard ga(immutable_zeros_access_mutex_); - if (immutable_zeros_cache_.use_count() <= 1) { - immutable_zeros_cache_.reset(); + auto cache = atomic_load(&immutable_zeros_cache_); + + // Because we now have a copy in our thread, the use count will be 2 if + // nothing else is using it. + if (cache.use_count() <= 2) { + atomic_store(&immutable_zeros_cache_, std::shared_ptr()); } } @@ -744,8 +750,7 @@ class BaseMemoryPoolImpl : public MemoryPool { protected: internal::MemoryPoolStats stats_; std::shared_ptr immutable_zeros_cache_; - std::mutex immutable_zeros_access_mutex_; - std::mutex immutable_zeros_grow_mutex_; + std::mutex immutable_zeros_mutex_; }; class SystemMemoryPool : public BaseMemoryPoolImpl { From cbf33dedb391fccd0bd1f162082457a73c2fc2d3 Mon Sep 17 00:00:00 2001 From: Jeroen van Straten Date: Tue, 15 Feb 2022 11:46:33 +0100 Subject: [PATCH 20/25] Add TODO for using dedicated zero-initialized alloc functions of allocation backends --- cpp/src/arrow/memory_pool.cc | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cpp/src/arrow/memory_pool.cc b/cpp/src/arrow/memory_pool.cc index 060a3b01f6a..97ec8008e04 100644 --- a/cpp/src/arrow/memory_pool.cc +++ b/cpp/src/arrow/memory_pool.cc @@ -657,6 +657,8 @@ class BaseMemoryPoolImpl : public MemoryPool { return Status::OK(); } #endif + // TODO: jemalloc and mimalloc support zero-initialized allocations as + // well, which might be faster than allocate + memset. RETURN_NOT_OK(Allocate(size, out)); std::memset(*out, 0, size); return Status::OK(); From d89c297ed90c037721b8279405b97a0852b62031 Mon Sep 17 00:00:00 2001 From: Jeroen van Straten Date: Tue, 15 Feb 2022 11:52:35 +0100 Subject: [PATCH 21/25] Rename constant to satisfy naming conventions --- cpp/src/arrow/memory_pool.cc | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cpp/src/arrow/memory_pool.cc b/cpp/src/arrow/memory_pool.cc index 97ec8008e04..136212f8473 100644 --- a/cpp/src/arrow/memory_pool.cc +++ b/cpp/src/arrow/memory_pool.cc @@ -698,11 +698,11 @@ class BaseMemoryPoolImpl : public MemoryPool { // heuristically, with the following rules: // - allocate at least the requested size (obviously); // - allocate at least 2x the previous size; - // - allocate at least MIN_ALLOC_SIZE bytes (to avoid lots of small + // - allocate at least kMinAllocSize bytes (to avoid lots of small // allocations). - static const int64_t MIN_ALLOC_SIZE = 4096; + static const int64_t kMinAllocSize = 4096; int64_t alloc_size = - std::max(size, current_buffer ? (current_buffer->size() * 2) : MIN_ALLOC_SIZE); + std::max(size, current_buffer ? (current_buffer->size() * 2) : kMinAllocSize); // Attempt to allocate the block. uint8_t* data = nullptr; From 3dda47402dd57228e0b46f8ba3ef4409b3b3ee46 Mon Sep 17 00:00:00 2001 From: Jeroen van Straten Date: Tue, 15 Feb 2022 19:16:09 +0100 Subject: [PATCH 22/25] Remove unnecessary null check --- cpp/src/arrow/memory_pool.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/memory_pool.cc b/cpp/src/arrow/memory_pool.cc index 136212f8473..1cb86268a02 100644 --- a/cpp/src/arrow/memory_pool.cc +++ b/cpp/src/arrow/memory_pool.cc @@ -710,7 +710,7 @@ class BaseMemoryPoolImpl : public MemoryPool { // If we fail to do so, fall back to trying to allocate the requested size // exactly as a last-ditch effort. - if (!result.ok() || data == nullptr) { + if (!result.ok()) { alloc_size = size; RETURN_NOT_OK(AllocateImmutableZeros(alloc_size, &data)); } From 37388192f12c35691ffddf111b14e8b4f284c08f Mon Sep 17 00:00:00 2001 From: Jeroen van Straten Date: Tue, 15 Feb 2022 20:14:50 +0100 Subject: [PATCH 23/25] Move OS-specific mmap logic for immutable zeros to io_util.[h|cc] --- cpp/src/arrow/memory_pool.cc | 33 ++++++++------------------------- cpp/src/arrow/util/io_util.cc | 35 +++++++++++++++++++++++++++++++++++ cpp/src/arrow/util/io_util.h | 5 +++++ 3 files changed, 48 insertions(+), 25 deletions(-) diff --git a/cpp/src/arrow/memory_pool.cc b/cpp/src/arrow/memory_pool.cc index 1cb86268a02..141454dcc1f 100644 --- a/cpp/src/arrow/memory_pool.cc +++ b/cpp/src/arrow/memory_pool.cc @@ -30,13 +30,6 @@ #include #endif -#ifndef ARROW_DISABLE_MMAP_FOR_IMMUTABLE_ZEROS -#ifdef __linux__ -#define USE_MMAP_FOR_IMMUTABLE_ZEROS -#include -#endif -#endif - #include "arrow/buffer.h" #include "arrow/io/util_internal.h" #include "arrow/result.h" @@ -645,33 +638,23 @@ class BaseMemoryPoolImpl : public MemoryPool { protected: virtual Status AllocateImmutableZeros(int64_t size, uint8_t** out) { -#ifdef USE_MMAP_FOR_IMMUTABLE_ZEROS - if (size > 0) { - *out = static_cast(mmap( - nullptr, size, PROT_READ, MAP_PRIVATE | MAP_ANONYMOUS | MAP_NORESERVE, -1, 0)); - if (*out == MAP_FAILED) { - auto err = errno; - return Status::OutOfMemory("Failed to allocate zero buffer of size ", size, ": ", - strerror(err)); - } - return Status::OK(); - } -#endif +#ifdef ARROW_DISABLE_MMAP_FOR_IMMUTABLE_ZEROS // TODO: jemalloc and mimalloc support zero-initialized allocations as // well, which might be faster than allocate + memset. RETURN_NOT_OK(Allocate(size, out)); std::memset(*out, 0, size); return Status::OK(); +#else + return internal::MemoryMapZeros(size, out); +#endif } void FreeImmutableZeros(uint8_t* buffer, int64_t size) override { -#ifdef USE_MMAP_FOR_IMMUTABLE_ZEROS - if (size > 0) { - munmap(buffer, size); - return; - } -#endif +#ifdef ARROW_DISABLE_MMAP_FOR_IMMUTABLE_ZEROS Free(buffer, size); +#else + internal::MemoryUnmapZeros(buffer, size); +#endif } public: diff --git a/cpp/src/arrow/util/io_util.cc b/cpp/src/arrow/util/io_util.cc index 853e30081dc..ca4491fbfee 100644 --- a/cpp/src/arrow/util/io_util.cc +++ b/cpp/src/arrow/util/io_util.cc @@ -1138,6 +1138,41 @@ Status MemoryAdviseWillNeed(const std::vector& regions) { #endif } +// +// Compatible way for getting (large amounts of) immutable, zeroed memory +// + +Status MemoryMapZeros(size_t size, uint8_t** out) { +#ifdef __linux__ + *out = static_cast( + mmap(nullptr, size, PROT_READ, MAP_PRIVATE | MAP_ANONYMOUS | MAP_NORESERVE, -1, 0)); + if (*out == MAP_FAILED) { + auto err = errno; + return Status::OutOfMemory("Failed to allocate zero buffer of size ", size, ": ", + strerror(err)); + } + return Status::OK(); +#else + // Fallback. + // TODO: can use VirtualAlloc for Windows + *out = static_cast(std::calloc(1, size)); + if (*out == nullptr) { + auto err = errno; + return Status::OutOfMemory("Failed to allocate zero buffer of size ", size, ": ", + strerror(err)); + } + return Status::OK(); +#endif +} + +void MemoryUnmapZeros(uint8_t* buffer, size_t size) { +#ifdef __linux__ + munmap(buffer, size); +#else + std::free(buffer); +#endif +} + // // Closing files // diff --git a/cpp/src/arrow/util/io_util.h b/cpp/src/arrow/util/io_util.h index c094727a647..04600355232 100644 --- a/cpp/src/arrow/util/io_util.h +++ b/cpp/src/arrow/util/io_util.h @@ -179,6 +179,11 @@ Status MemoryMapRemap(void* addr, size_t old_size, size_t new_size, int fildes, ARROW_EXPORT Status MemoryAdviseWillNeed(const std::vector& regions); +ARROW_EXPORT +Status MemoryMapZeros(size_t size, uint8_t** out); +ARROW_EXPORT +void MemoryUnmapZeros(uint8_t* buffer, size_t size); + ARROW_EXPORT Result GetEnvVar(const char* name); ARROW_EXPORT From dee486a8ede6aa6e96dbd53ae0d6a4abe950739c Mon Sep 17 00:00:00 2001 From: Jeroen van Straten Date: Wed, 16 Feb 2022 12:39:06 +0100 Subject: [PATCH 24/25] Use internal::atomic[load|store] for shared_ptr for compatibility --- cpp/src/arrow/memory_pool.cc | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/cpp/src/arrow/memory_pool.cc b/cpp/src/arrow/memory_pool.cc index 141454dcc1f..d375f5bbfff 100644 --- a/cpp/src/arrow/memory_pool.cc +++ b/cpp/src/arrow/memory_pool.cc @@ -16,6 +16,7 @@ // under the License. #include "arrow/memory_pool.h" +#include "arrow/util/atomic_shared_ptr.h" #include // IWYU pragma: keep #include @@ -660,7 +661,7 @@ class BaseMemoryPoolImpl : public MemoryPool { public: Result> GetImmutableZeros(int64_t size) override { // Thread-safely get the current largest buffer of zeros. - auto current_buffer = atomic_load(&immutable_zeros_cache_); + auto current_buffer = internal::atomic_load(&immutable_zeros_cache_); // If this buffer satisfies the requirements, return it. if (current_buffer && current_buffer->size() >= size) { @@ -672,7 +673,7 @@ class BaseMemoryPoolImpl : public MemoryPool { // Between our previous atomic load and acquisition of the lock, another // thread may have allocated a buffer. So we need to check again. - current_buffer = atomic_load(&immutable_zeros_cache_); + current_buffer = internal::atomic_load(&immutable_zeros_cache_); if (current_buffer && current_buffer->size() >= size) { return std::move(current_buffer); } @@ -706,7 +707,7 @@ class BaseMemoryPoolImpl : public MemoryPool { // Store a reference to the new block in the cache, so subsequent calls to // this function (from this thread or from other threads) can use it, too. - atomic_store(&immutable_zeros_cache_, current_buffer); + internal::atomic_store(&immutable_zeros_cache_, current_buffer); return std::move(current_buffer); } @@ -716,12 +717,13 @@ class BaseMemoryPoolImpl : public MemoryPool { // there are other pieces of code using it, getting rid of the cache won't // deallocate it anyway, so it's better to hold onto it. { - auto cache = atomic_load(&immutable_zeros_cache_); + auto cache = internal::atomic_load(&immutable_zeros_cache_); // Because we now have a copy in our thread, the use count will be 2 if // nothing else is using it. if (cache.use_count() <= 2) { - atomic_store(&immutable_zeros_cache_, std::shared_ptr()); + internal::atomic_store(&immutable_zeros_cache_, + std::shared_ptr()); } } From 976dcf697a3773b2e0bd3323ebdbd871761e8178 Mon Sep 17 00:00:00 2001 From: Jeroen van Straten Date: Thu, 17 Feb 2022 16:04:53 +0100 Subject: [PATCH 25/25] Revert addition of special memory manager for immutable zeros --- cpp/src/arrow/device.cc | 79 ++---------------------------------- cpp/src/arrow/device.h | 64 ----------------------------- cpp/src/arrow/memory_pool.cc | 7 ++-- 3 files changed, 6 insertions(+), 144 deletions(-) diff --git a/cpp/src/arrow/device.cc b/cpp/src/arrow/device.cc index 195b46e08a5..d042e3445c5 100644 --- a/cpp/src/arrow/device.cc +++ b/cpp/src/arrow/device.cc @@ -43,8 +43,6 @@ Device::~Device() {} return maybe_buffer; \ } -bool MemoryManager::is_mutable() const { return true; } - Result> MemoryManager::CopyBuffer( const std::shared_ptr& buf, const std::shared_ptr& to) { const auto& from = buf->memory_manager(); @@ -163,10 +161,10 @@ Result> CPUMemoryManager::ViewBufferFrom( Result> CPUMemoryManager::CopyBufferTo( const std::shared_ptr& buf, const std::shared_ptr& to) { - if (!to->is_cpu() || !to->is_mutable()) { + if (!to->is_cpu()) { return nullptr; } - ARROW_ASSIGN_OR_RAISE(auto dest, to->AllocateBuffer(buf->size())); + ARROW_ASSIGN_OR_RAISE(auto dest, ::arrow::AllocateBuffer(buf->size(), pool_)); if (buf->size() > 0) { memcpy(dest->mutable_data(), buf->data(), static_cast(buf->size())); } @@ -175,7 +173,7 @@ Result> CPUMemoryManager::CopyBufferTo( Result> CPUMemoryManager::ViewBufferTo( const std::shared_ptr& buf, const std::shared_ptr& to) { - if (!to->is_cpu() || !to->is_mutable()) { + if (!to->is_cpu()) { return nullptr; } return buf; @@ -187,72 +185,6 @@ std::shared_ptr default_cpu_memory_manager() { return instance; } -std::shared_ptr CPUImmutableZerosMemoryManager::Make( - const std::shared_ptr& device, MemoryPool* pool) { - return std::shared_ptr(new CPUImmutableZerosMemoryManager(device, pool)); -} - -Result> -CPUImmutableZerosMemoryManager::GetBufferReader(std::shared_ptr buf) { - return std::make_shared(std::move(buf)); -} - -Result> CPUImmutableZerosMemoryManager::GetBufferWriter( - std::shared_ptr buf) { - return Status::NotImplemented("cannot write to immutable zero buffers"); -} - -Result> CPUImmutableZerosMemoryManager::AllocateBuffer( - int64_t size) { - return ::arrow::MakeBufferOfZeros(size, pool_); -} - -Result> CPUImmutableZerosMemoryManager::CopyBufferFrom( - const std::shared_ptr& buf, const std::shared_ptr& from) { - // Cannot write to immutable zero buffers - return nullptr; -} - -Result> CPUImmutableZerosMemoryManager::ViewBufferFrom( - const std::shared_ptr& buf, const std::shared_ptr& from) { - // Can only guarantee source buffer satisfies our conditions if we're also the - // source memory manager - if (from.get() != this) { - return nullptr; - } - return buf; -} - -Result> CPUImmutableZerosMemoryManager::CopyBufferTo( - const std::shared_ptr& buf, const std::shared_ptr& to) { - if (!to->is_cpu() || !to->is_mutable()) { - return nullptr; - } - ARROW_ASSIGN_OR_RAISE(auto dest, to->AllocateBuffer(buf->size())); - if (buf->size() > 0) { - // Note: we can memset to zero instead of doing a memcpy, because we already - // know that the source buffer is all zeros - memset(dest->mutable_data(), 0, static_cast(buf->size())); - } - return std::move(dest); -} - -Result> CPUImmutableZerosMemoryManager::ViewBufferTo( - const std::shared_ptr& buf, const std::shared_ptr& to) { - // Can only view this buffer if destination memory manager is either mutable or - // generates buffers with the same immutable pattern that we do - if (!to->is_cpu() || !(to->is_mutable() || to.get() == this)) { - return nullptr; - } - return buf; -} - -std::shared_ptr default_cpu_immutable_zeros_memory_manager() { - static auto instance = - CPUImmutableZerosMemoryManager::Make(CPUDevice::Instance(), default_memory_pool()); - return instance; -} - std::shared_ptr CPUDevice::Instance() { static auto instance = std::shared_ptr(new CPUDevice()); return instance; @@ -270,11 +202,6 @@ std::shared_ptr CPUDevice::memory_manager(MemoryPool* pool) { return CPUMemoryManager::Make(Instance(), pool); } -std::shared_ptr CPUDevice::immutable_zeros_memory_manager( - MemoryPool* pool) { - return CPUImmutableZerosMemoryManager::Make(Instance(), pool); -} - std::shared_ptr CPUDevice::default_memory_manager() { return default_cpu_memory_manager(); } diff --git a/cpp/src/arrow/device.h b/cpp/src/arrow/device.h index 4f6a1c37fbb..373a8a5db1f 100644 --- a/cpp/src/arrow/device.h +++ b/cpp/src/arrow/device.h @@ -96,14 +96,6 @@ class ARROW_EXPORT MemoryManager : public std::enable_shared_from_thisis_cpu(); } - /// \brief Whether buffers managed by this MemoryManager are mutable. - /// - /// When this returns false, the contents of the buffers managed by this memory - /// manager by definition have some special significance, since only the manager - /// has control over what's in them. For example, this may be that the contents - /// are always zero, satisfy some pattern, etc. - virtual bool is_mutable() const; - /// \brief Create a RandomAccessFile to read a particular buffer. /// /// The given buffer must be tied to this MemoryManager. @@ -181,11 +173,6 @@ class ARROW_EXPORT CPUDevice : public Device { /// The returned MemoryManager will use the given MemoryPool for allocations. static std::shared_ptr memory_manager(MemoryPool* pool); - /// \brief Create a MemoryManager for immutable zero buffers - /// - /// The returned MemoryManager will use the given MemoryPool for allocations. - static std::shared_ptr immutable_zeros_memory_manager(MemoryPool* pool); - protected: CPUDevice() : Device(true) {} }; @@ -236,55 +223,4 @@ class ARROW_EXPORT CPUMemoryManager : public MemoryManager { ARROW_EXPORT std::shared_ptr default_cpu_memory_manager(); -/// A memory manager that uses the immutable zeros interface of the given memory pool, -/// rather than the normal mutable buffer interface. -class ARROW_EXPORT CPUImmutableZerosMemoryManager : public MemoryManager { - public: - bool is_mutable() const override { return false; }; - - Result> GetBufferReader( - std::shared_ptr buf) override; - Result> GetBufferWriter( - std::shared_ptr buf) override; - - Result> AllocateBuffer(int64_t size) override; - - /// \brief Return the MemoryPool associated with this MemoryManager. - MemoryPool* pool() const { return pool_; } - - protected: - CPUImmutableZerosMemoryManager(const std::shared_ptr& device, MemoryPool* pool) - : MemoryManager(device), pool_(pool) {} - - static std::shared_ptr Make(const std::shared_ptr& device, - MemoryPool* pool = default_memory_pool()); - - Result> CopyBufferFrom( - const std::shared_ptr& buf, - const std::shared_ptr& from) override; - Result> CopyBufferTo( - const std::shared_ptr& buf, - const std::shared_ptr& to) override; - Result> ViewBufferFrom( - const std::shared_ptr& buf, - const std::shared_ptr& from) override; - Result> ViewBufferTo( - const std::shared_ptr& buf, - const std::shared_ptr& to) override; - - MemoryPool* pool_; - - friend std::shared_ptr CPUDevice::immutable_zeros_memory_manager( - MemoryPool* pool); - friend ARROW_EXPORT std::shared_ptr - default_cpu_immutable_zeros_memory_manager(); -}; - -/// \brief Return the default CPU MemoryManager instance for allocating immutable blocks -/// of zeros -/// -/// The returned singleton instance uses the default MemoryPool. -ARROW_EXPORT -std::shared_ptr default_cpu_immutable_zeros_memory_manager(); - } // namespace arrow diff --git a/cpp/src/arrow/memory_pool.cc b/cpp/src/arrow/memory_pool.cc index d375f5bbfff..dc28f21bf0e 100644 --- a/cpp/src/arrow/memory_pool.cc +++ b/cpp/src/arrow/memory_pool.cc @@ -540,8 +540,7 @@ int64_t MemoryPool::max_memory() const { return -1; } class ImmutableZeros : public Buffer { public: explicit ImmutableZeros(uint8_t* data, int64_t size, MemoryPool* pool) - : Buffer(data, size, CPUDevice::immutable_zeros_memory_manager(pool)), - pool_(pool) {} + : Buffer(data, size, CPUDevice::memory_manager(pool)), pool_(pool) {} ImmutableZeros() : Buffer(nullptr, 0), pool_(nullptr) {} @@ -1125,9 +1124,9 @@ class ImmutableZerosPoolBuffer final : public Buffer { std::shared_ptr mm; if (pool == nullptr) { pool = default_memory_pool(); - mm = default_cpu_immutable_zeros_memory_manager(); + mm = default_cpu_memory_manager(); } else { - mm = CPUDevice::immutable_zeros_memory_manager(pool); + mm = CPUDevice::memory_manager(pool); } ARROW_ASSIGN_OR_RAISE(auto zeros, pool->GetImmutableZeros(size)); return std::unique_ptr(