diff --git a/cpp/src/arrow/array/array_test.cc b/cpp/src/arrow/array/array_test.cc index 4e13c54d1c8..9e11721784a 100644 --- a/cpp/src/arrow/array/array_test.cc +++ b/cpp/src/arrow/array/array_test.cc @@ -368,25 +368,27 @@ TEST_F(TestArray, TestMakeArrayOfNull) { // clang-format on }; - for (int64_t length : {0, 1, 16, 133}) { - for (auto type : types) { - ARROW_SCOPED_TRACE("type = ", type->ToString()); - ASSERT_OK_AND_ASSIGN(auto array, MakeArrayOfNull(type, length)); - ASSERT_EQ(array->type(), type); - ASSERT_OK(array->ValidateFull()); - ASSERT_EQ(array->length(), length); - if (is_union(type->id())) { - // For unions, MakeArrayOfNull places the nulls in the children - ASSERT_EQ(array->null_count(), 0); - const auto& union_array = checked_cast(*array); - for (int i = 0; i < union_array.num_fields(); ++i) { - ASSERT_EQ(union_array.field(i)->null_count(), union_array.field(i)->length()); - } - } else { - ASSERT_EQ(array->null_count(), length); - for (int64_t i = 0; i < length; ++i) { - ASSERT_TRUE(array->IsNull(i)); - ASSERT_FALSE(array->IsValid(i)); + for (auto fun : {MakeArrayOfNull, MakeMutableArrayOfNull}) { + for (int64_t length : {0, 1, 16, 133}) { + for (auto type : types) { + ARROW_SCOPED_TRACE("type = ", type->ToString()); + ASSERT_OK_AND_ASSIGN(auto array, fun(type, length, default_memory_pool())); + ASSERT_EQ(array->type(), type); + ASSERT_OK(array->ValidateFull()); + ASSERT_EQ(array->length(), length); + if (is_union(type->id())) { + // For unions, MakeArrayOfNull places the nulls in the children + ASSERT_EQ(array->null_count(), 0); + const auto& union_array = checked_cast(*array); + for (int i = 0; i < union_array.num_fields(); ++i) { + ASSERT_EQ(union_array.field(i)->null_count(), union_array.field(i)->length()); + } + } else { + ASSERT_EQ(array->null_count(), length); + for (int64_t i = 0; i < length; ++i) { + ASSERT_TRUE(array->IsNull(i)); + ASSERT_FALSE(array->IsValid(i)); + } } } } @@ -397,51 +399,56 @@ TEST_F(TestArray, TestMakeArrayOfNullUnion) { // Unions need special checking -- the top level null count is 0 (per // ARROW-9222) so we check the first child to make sure is contains all nulls // and check that the type_ids all point to the first child - const int64_t union_length = 10; - auto s_union_ty = sparse_union({field("a", utf8()), field("b", int32())}, {0, 1}); - ASSERT_OK_AND_ASSIGN(auto s_union_nulls, MakeArrayOfNull(s_union_ty, union_length)); - ASSERT_OK(s_union_nulls->ValidateFull()); - ASSERT_EQ(s_union_nulls->null_count(), 0); - { - const auto& typed_union = checked_cast(*s_union_nulls); - ASSERT_EQ(typed_union.field(0)->null_count(), union_length); - - // Check type codes are all 0 - for (int i = 0; i < union_length; ++i) { - ASSERT_EQ(typed_union.raw_type_codes()[i], 0); + for (auto fun : {MakeArrayOfNull, MakeMutableArrayOfNull}) { + const int64_t union_length = 10; + auto s_union_ty = sparse_union({field("a", utf8()), field("b", int32())}, {0, 1}); + ASSERT_OK_AND_ASSIGN(auto s_union_nulls, + fun(s_union_ty, union_length, default_memory_pool())); + ASSERT_OK(s_union_nulls->ValidateFull()); + ASSERT_EQ(s_union_nulls->null_count(), 0); + { + const auto& typed_union = checked_cast(*s_union_nulls); + ASSERT_EQ(typed_union.field(0)->null_count(), union_length); + + // Check type codes are all 0 + for (int i = 0; i < union_length; ++i) { + ASSERT_EQ(typed_union.raw_type_codes()[i], 0); + } } - } - - s_union_ty = sparse_union({field("a", utf8()), field("b", int32())}, {2, 7}); - ASSERT_OK_AND_ASSIGN(s_union_nulls, MakeArrayOfNull(s_union_ty, union_length)); - ASSERT_OK(s_union_nulls->ValidateFull()); - ASSERT_EQ(s_union_nulls->null_count(), 0); - { - const auto& typed_union = checked_cast(*s_union_nulls); - ASSERT_EQ(typed_union.field(0)->null_count(), union_length); - // Check type codes are all 2 - for (int i = 0; i < union_length; ++i) { - ASSERT_EQ(typed_union.raw_type_codes()[i], 2); + s_union_ty = sparse_union({field("a", utf8()), field("b", int32())}, {2, 7}); + ASSERT_OK_AND_ASSIGN(s_union_nulls, + fun(s_union_ty, union_length, default_memory_pool())); + ASSERT_OK(s_union_nulls->ValidateFull()); + ASSERT_EQ(s_union_nulls->null_count(), 0); + { + const auto& typed_union = checked_cast(*s_union_nulls); + ASSERT_EQ(typed_union.field(0)->null_count(), union_length); + + // Check type codes are all 2 + for (int i = 0; i < union_length; ++i) { + ASSERT_EQ(typed_union.raw_type_codes()[i], 2); + } } - } - - auto d_union_ty = dense_union({field("a", utf8()), field("b", int32())}, {0, 1}); - ASSERT_OK_AND_ASSIGN(auto d_union_nulls, MakeArrayOfNull(d_union_ty, union_length)); - ASSERT_OK(d_union_nulls->ValidateFull()); - ASSERT_EQ(d_union_nulls->null_count(), 0); - { - const auto& typed_union = checked_cast(*d_union_nulls); - // Child field has length 1 which is a null element - ASSERT_EQ(typed_union.field(0)->length(), 1); - ASSERT_EQ(typed_union.field(0)->null_count(), 1); - - // Check type codes are all 0 and the offsets point to the first element of - // the first child - for (int i = 0; i < union_length; ++i) { - ASSERT_EQ(typed_union.raw_type_codes()[i], 0); - ASSERT_EQ(typed_union.raw_value_offsets()[i], 0); + auto d_union_ty = dense_union({field("a", utf8()), field("b", int32())}, {0, 1}); + ASSERT_OK_AND_ASSIGN(auto d_union_nulls, + fun(d_union_ty, union_length, default_memory_pool())); + ASSERT_OK(d_union_nulls->ValidateFull()); + ASSERT_EQ(d_union_nulls->null_count(), 0); + { + const auto& typed_union = checked_cast(*d_union_nulls); + + // Child field has length 1 which is a null element + ASSERT_EQ(typed_union.field(0)->length(), 1); + ASSERT_EQ(typed_union.field(0)->null_count(), 1); + + // Check type codes are all 0 and the offsets point to the first element of + // the first child + for (int i = 0; i < union_length; ++i) { + ASSERT_EQ(typed_union.raw_type_codes()[i], 0); + ASSERT_EQ(typed_union.raw_value_offsets()[i], 0); + } } } } @@ -579,30 +586,32 @@ static ScalarVector GetScalars() { } TEST_F(TestArray, TestMakeArrayFromScalar) { - ASSERT_OK_AND_ASSIGN(auto null_array, MakeArrayFromScalar(NullScalar(), 5)); - ASSERT_OK(null_array->ValidateFull()); - ASSERT_EQ(null_array->length(), 5); - ASSERT_EQ(null_array->null_count(), 5); - - auto scalars = GetScalars(); + for (auto fun : {MakeArrayFromScalar, MakeMutableArrayFromScalar}) { + ASSERT_OK_AND_ASSIGN(auto null_array, fun(NullScalar(), 5, default_memory_pool())); + ASSERT_OK(null_array->ValidateFull()); + ASSERT_EQ(null_array->length(), 5); + ASSERT_EQ(null_array->null_count(), 5); + + auto scalars = GetScalars(); + + for (int64_t length : {16}) { + for (auto scalar : scalars) { + ASSERT_OK_AND_ASSIGN(auto array, fun(*scalar, length, default_memory_pool())); + ASSERT_OK(array->ValidateFull()); + ASSERT_EQ(array->length(), length); + ASSERT_EQ(array->null_count(), 0); - for (int64_t length : {16}) { - for (auto scalar : scalars) { - ASSERT_OK_AND_ASSIGN(auto array, MakeArrayFromScalar(*scalar, length)); - ASSERT_OK(array->ValidateFull()); - ASSERT_EQ(array->length(), length); - ASSERT_EQ(array->null_count(), 0); - - // test case for ARROW-13321 - for (int64_t i : std::vector{0, length / 2, length - 1}) { - ASSERT_OK_AND_ASSIGN(auto s, array->GetScalar(i)); - AssertScalarsEqual(*s, *scalar, /*verbose=*/true); + // test case for ARROW-13321 + for (int64_t i : std::vector{0, length / 2, length - 1}) { + ASSERT_OK_AND_ASSIGN(auto s, array->GetScalar(i)); + AssertScalarsEqual(*s, *scalar, /*verbose=*/true); + } } } - } - for (auto scalar : scalars) { - AssertAppendScalar(pool_, scalar); + for (auto scalar : scalars) { + AssertAppendScalar(pool_, scalar); + } } } @@ -610,13 +619,15 @@ TEST_F(TestArray, TestMakeArrayFromScalarSliced) { // Regression test for ARROW-13437 auto scalars = GetScalars(); - for (auto scalar : scalars) { - SCOPED_TRACE(scalar->type->ToString()); - ASSERT_OK_AND_ASSIGN(auto array, MakeArrayFromScalar(*scalar, 32)); - auto sliced = array->Slice(1, 4); - ASSERT_EQ(sliced->length(), 4); - ASSERT_EQ(sliced->null_count(), 0); - ARROW_EXPECT_OK(sliced->ValidateFull()); + for (auto fun : {MakeArrayFromScalar, MakeMutableArrayFromScalar}) { + for (auto scalar : scalars) { + SCOPED_TRACE(scalar->type->ToString()); + ASSERT_OK_AND_ASSIGN(auto array, fun(*scalar, 32, default_memory_pool())); + auto sliced = array->Slice(1, 4); + ASSERT_EQ(sliced->length(), 4); + ASSERT_EQ(sliced->null_count(), 0); + ARROW_EXPECT_OK(sliced->ValidateFull()); + } } } @@ -626,14 +637,16 @@ TEST_F(TestArray, TestMakeArrayFromDictionaryScalar) { ASSERT_OK_AND_ASSIGN(auto value, MakeScalar(int8(), 1)); auto scalar = DictionaryScalar({value, dictionary}, type); - ASSERT_OK_AND_ASSIGN(auto array, MakeArrayFromScalar(scalar, 4)); - ASSERT_OK(array->ValidateFull()); - ASSERT_EQ(array->length(), 4); - ASSERT_EQ(array->null_count(), 0); + for (auto fun : {MakeArrayFromScalar, MakeMutableArrayFromScalar}) { + ASSERT_OK_AND_ASSIGN(auto array, fun(scalar, 4, default_memory_pool())); + ASSERT_OK(array->ValidateFull()); + ASSERT_EQ(array->length(), 4); + ASSERT_EQ(array->null_count(), 0); - for (int i = 0; i < 4; i++) { - ASSERT_OK_AND_ASSIGN(auto item, array->GetScalar(i)); - ASSERT_TRUE(item->Equals(scalar)); + for (int i = 0; i < 4; i++) { + ASSERT_OK_AND_ASSIGN(auto item, array->GetScalar(i)); + ASSERT_TRUE(item->Equals(scalar)); + } } } @@ -643,17 +656,21 @@ TEST_F(TestArray, TestMakeArrayFromMapScalar) { R"([{"key": "a", "value": 1}, {"key": "b", "value": 2}])"); auto scalar = MapScalar(value); - ASSERT_OK_AND_ASSIGN(auto array, MakeArrayFromScalar(scalar, 11)); - ASSERT_OK(array->ValidateFull()); - ASSERT_EQ(array->length(), 11); - ASSERT_EQ(array->null_count(), 0); + for (auto fun : {MakeArrayFromScalar, MakeMutableArrayFromScalar}) { + ASSERT_OK_AND_ASSIGN(auto array, fun(scalar, 11, default_memory_pool())); + ASSERT_OK(array->ValidateFull()); + ASSERT_EQ(array->length(), 11); + ASSERT_EQ(array->null_count(), 0); - for (int i = 0; i < 11; i++) { - ASSERT_OK_AND_ASSIGN(auto item, array->GetScalar(i)); - ASSERT_TRUE(item->Equals(scalar)); - } + for (int i = 0; i < 11; i++) { + ASSERT_OK_AND_ASSIGN(auto item, array->GetScalar(i)); + ASSERT_TRUE(item->Equals(scalar)); + } - AssertAppendScalar(pool_, std::make_shared(scalar)); + if (fun != MakeMutableArrayFromScalar) { + AssertAppendScalar(pool_, std::make_shared(scalar)); + } + } } TEST_F(TestArray, TestMakeEmptyArray) { diff --git a/cpp/src/arrow/array/util.cc b/cpp/src/arrow/array/util.cc index 413182de0df..a81e2a69dea 100644 --- a/cpp/src/arrow/array/util.cc +++ b/cpp/src/arrow/array/util.cc @@ -422,8 +422,7 @@ class NullArrayFactory { Status CreateBuffer() { ARROW_ASSIGN_OR_RAISE(int64_t buffer_length, GetBufferLength(type_, length_).Finish()); - ARROW_ASSIGN_OR_RAISE(buffer_, AllocateBuffer(buffer_length, pool_)); - std::memset(buffer_->mutable_data(), 0, buffer_->size()); + ARROW_ASSIGN_OR_RAISE(buffer_, MakeBufferOfZeros(buffer_length, pool_)); return Status::OK(); } @@ -534,6 +533,152 @@ class NullArrayFactory { std::shared_ptr buffer_; }; +// mutable version of NullArrayFactory, i.e. one that doesn't reuse a single buffer +class MutableNullArrayFactory { + private: + Result> CreateZeroByteBuffer(size_t scalar_size_bytes) const { + ARROW_ASSIGN_OR_RAISE(auto buffer, + AllocateBuffer(length_ * scalar_size_bytes, pool_)); + std::memset(buffer->mutable_data(), 0, buffer->size()); + return std::shared_ptr(std::move(buffer)); + } + + Result> CreateZeroOffsetBuffer(size_t index_size_bytes) const { + ARROW_ASSIGN_OR_RAISE(auto buffer, + AllocateBuffer((length_ + 1) * index_size_bytes, pool_)); + std::memset(buffer->mutable_data(), 0, buffer->size()); + return std::shared_ptr(std::move(buffer)); + } + + Result> CreateZeroBitBuffer(size_t scalar_size_bits) const { + ARROW_ASSIGN_OR_RAISE( + auto buffer, + AllocateBuffer(bit_util::BytesForBits(length_ * scalar_size_bits), pool_)); + std::memset(buffer->mutable_data(), 0, buffer->size()); + return std::shared_ptr(std::move(buffer)); + } + + Result> CreateEmptyBuffer() { return AllocateBuffer(0, pool_); } + + public: + MutableNullArrayFactory(MemoryPool* pool, const std::shared_ptr& type, + int64_t length) + : pool_(pool), type_(type), length_(length) {} + + Result> Create() { + std::vector> child_data(type_->num_fields()); + ARROW_ASSIGN_OR_RAISE(auto validity, CreateZeroBitBuffer(1)); + out_ = ArrayData::Make(type_, length_, {validity}, child_data, length_, 0); + RETURN_NOT_OK(VisitTypeInline(*type_, this)); + return out_; + } + + Status Visit(const NullType&) { + out_->buffers.resize(1, nullptr); + return Status::OK(); + } + + Status Visit(const FixedWidthType& type) { + out_->buffers.resize(2); + // values + ARROW_ASSIGN_OR_RAISE(out_->buffers[1], CreateZeroBitBuffer(type.bit_width())); + return Status::OK(); + } + + template + enable_if_base_binary Visit(const T&) { + out_->buffers.resize(3); + // offsets + ARROW_ASSIGN_OR_RAISE(out_->buffers[1], + CreateZeroOffsetBuffer(sizeof(typename T::offset_type))); + // values + ARROW_ASSIGN_OR_RAISE(out_->buffers[2], CreateEmptyBuffer()); + return Status::OK(); + } + + template + enable_if_var_size_list Visit(const T& type) { + out_->buffers.resize(2); + // offsets + ARROW_ASSIGN_OR_RAISE(out_->buffers[1], + CreateZeroOffsetBuffer(sizeof(typename T::offset_type))); + // values + ARROW_ASSIGN_OR_RAISE(out_->child_data[0], CreateChild(type, 0, /*length=*/0)); + return Status::OK(); + } + + Status Visit(const FixedSizeListType& type) { + ARROW_ASSIGN_OR_RAISE(out_->child_data[0], + CreateChild(type, 0, length_ * type.list_size())); + return Status::OK(); + } + + Status Visit(const StructType& type) { + for (int i = 0; i < type_->num_fields(); ++i) { + ARROW_ASSIGN_OR_RAISE(out_->child_data[i], CreateChild(type, i, length_)); + } + return Status::OK(); + } + + Status Visit(const UnionType& type) { + out_->buffers.resize(2); + + // First buffer is always null + out_->buffers[0] = nullptr; + + // type ID buffer + ARROW_ASSIGN_OR_RAISE(out_->buffers[1], AllocateBuffer(length_, pool_)); + std::memset(out_->buffers[1]->mutable_data(), type.type_codes()[0], length_); + + // For sparse unions, we now create children with the same length as the + // parent + int64_t child_length = length_; + if (type.mode() == UnionMode::DENSE) { + // For dense unions, we set the offsets to all zero and create children + // with length 1 + out_->buffers.resize(3); + ARROW_ASSIGN_OR_RAISE(out_->buffers[2], CreateZeroByteBuffer(sizeof(int32_t))); + + child_length = 1; + } + for (int i = 0; i < type_->num_fields(); ++i) { + ARROW_ASSIGN_OR_RAISE(out_->child_data[i], CreateChild(type, i, child_length)); + } + return Status::OK(); + } + + Status Visit(const DictionaryType& type) { + out_->buffers.resize(2); + // dictionary indices + ARROW_ASSIGN_OR_RAISE(out_->buffers[1], CreateZeroBitBuffer(type.bit_width())); + // dictionary data + ARROW_ASSIGN_OR_RAISE(auto typed_null_dict, MakeArrayOfNull(type.value_type(), 0)); + out_->dictionary = typed_null_dict->data(); + return Status::OK(); + } + + Status Visit(const ExtensionType& type) { + out_->child_data.resize(type.storage_type()->num_fields()); + RETURN_NOT_OK(VisitTypeInline(*type.storage_type(), this)); + return Status::OK(); + } + + Status Visit(const DataType& type) { + return Status::NotImplemented("construction of all-null ", type); + } + + Result> CreateChild(const DataType& type, int i, + int64_t length) { + MutableNullArrayFactory child_factory(pool_, type.field(i)->type(), length); + return child_factory.Create(); + } + + MemoryPool* pool_; + std::shared_ptr type_; + int64_t length_; + std::shared_ptr out_; +}; + class RepeatedArrayFactory { public: RepeatedArrayFactory(MemoryPool* pool, const Scalar& scalar, int64_t length) @@ -777,12 +922,28 @@ class RepeatedArrayFactory { } // namespace +Result> MakeMutableArrayOfNull( + const std::shared_ptr& type, int64_t length, MemoryPool* pool) { + ARROW_ASSIGN_OR_RAISE(auto data, MutableNullArrayFactory(pool, type, length).Create()); + return MakeArray(data); +} + Result> MakeArrayOfNull(const std::shared_ptr& type, int64_t length, MemoryPool* pool) { ARROW_ASSIGN_OR_RAISE(auto data, NullArrayFactory(pool, type, length).Create()); return MakeArray(data); } +Result> MakeMutableArrayFromScalar(const Scalar& scalar, + int64_t length, + MemoryPool* pool) { + // Null union scalars still have a type code associated + if (!scalar.is_valid && !is_union(scalar.type->id())) { + return MakeMutableArrayOfNull(scalar.type, length, pool); + } + return RepeatedArrayFactory(pool, scalar, length).Create(); +} + Result> MakeArrayFromScalar(const Scalar& scalar, int64_t length, MemoryPool* pool) { // Null union scalars still have a type code associated diff --git a/cpp/src/arrow/array/util.h b/cpp/src/arrow/array/util.h index 6e6c61bd03d..f32d28f5859 100644 --- a/cpp/src/arrow/array/util.h +++ b/cpp/src/arrow/array/util.h @@ -37,7 +37,20 @@ namespace arrow { ARROW_EXPORT std::shared_ptr MakeArray(const std::shared_ptr& data); -/// \brief Create a strongly-typed Array instance with all elements null +/// \brief Create a strongly-typed mutable Array instance with all elements initially set +/// to null +/// \param[in] type the array type \param[in] length the array length +/// \param[in] pool the memory pool to allocate memory from +ARROW_EXPORT +Result> MakeMutableArrayOfNull( + const std::shared_ptr& type, int64_t length, + MemoryPool* pool = default_memory_pool()); + +/// \brief Create a strongly-typed immutable Array instance with all elements null +/// +/// This function may reuse a single zero buffer, but may also defer to +/// MakeMutableArrayOfNull(). +/// /// \param[in] type the array type /// \param[in] length the array length /// \param[in] pool the memory pool to allocate memory from @@ -46,7 +59,20 @@ Result> MakeArrayOfNull(const std::shared_ptr& int64_t length, MemoryPool* pool = default_memory_pool()); -/// \brief Create an Array instance whose slots are the given scalar +/// \brief Create a mutable Array instance whose slots are initialized with the given +/// scalar +/// \param[in] scalar the value with which to fill the array +/// \param[in] length the array length +/// \param[in] pool the memory pool to allocate memory from +ARROW_EXPORT +Result> MakeMutableArrayFromScalar( + const Scalar& scalar, int64_t length, MemoryPool* pool = default_memory_pool()); + +/// \brief Create an immutable Array instance whose slots are set to the given scalar +/// +/// This function may reuse buffers if they contain the same (repeated) value to save +/// memory, but may also defer to MakeMutableArrayFromScalar(). +/// /// \param[in] scalar the value with which to fill the array /// \param[in] length the array length /// \param[in] pool the memory pool to allocate memory from diff --git a/cpp/src/arrow/buffer.h b/cpp/src/arrow/buffer.h index cfd525ab2d6..37624c6977e 100644 --- a/cpp/src/arrow/buffer.h +++ b/cpp/src/arrow/buffer.h @@ -462,6 +462,13 @@ ARROW_EXPORT Result> AllocateResizableBuffer( const int64_t size, MemoryPool* pool = NULLPTR); +/// \brief Allocate or reuse an immutable buffer filled with zeros from a memory pool. +/// +/// \param[in] size size of buffer to allocate +/// \param[in] pool a memory pool +ARROW_EXPORT +Result> MakeBufferOfZeros(int64_t size, MemoryPool* pool); + /// \brief Allocate a bitmap buffer from a memory pool /// no guarantee on values is provided. /// diff --git a/cpp/src/arrow/compute/kernels/util_internal.cc b/cpp/src/arrow/compute/kernels/util_internal.cc index 846fa26baf2..7bd52b1074a 100644 --- a/cpp/src/arrow/compute/kernels/util_internal.cc +++ b/cpp/src/arrow/compute/kernels/util_internal.cc @@ -70,7 +70,7 @@ ArrayKernelExec TrivialScalarUnaryAsArraysExec(ArrayKernelExec exec, } ARROW_ASSIGN_OR_RAISE(Datum array_in, MakeArrayFromScalar(*batch[0].scalar(), 1)); - ARROW_ASSIGN_OR_RAISE(Datum array_out, MakeArrayFromScalar(*out->scalar(), 1)); + ARROW_ASSIGN_OR_RAISE(Datum array_out, MakeMutableArrayFromScalar(*out->scalar(), 1)); RETURN_NOT_OK(exec(ctx, ExecBatch{{std::move(array_in)}, 1}, &array_out)); ARROW_ASSIGN_OR_RAISE(*out, array_out.make_array()->GetScalar(0)); return Status::OK(); diff --git a/cpp/src/arrow/memory_pool.cc b/cpp/src/arrow/memory_pool.cc index 2fab6f38255..dc28f21bf0e 100644 --- a/cpp/src/arrow/memory_pool.cc +++ b/cpp/src/arrow/memory_pool.cc @@ -16,6 +16,7 @@ // under the License. #include "arrow/memory_pool.h" +#include "arrow/util/atomic_shared_ptr.h" #include // IWYU pragma: keep #include @@ -536,6 +537,38 @@ int64_t MemoryPool::max_memory() const { return -1; } // MemoryPool implementation that delegates its core duty // to an Allocator class. +class ImmutableZeros : public Buffer { + public: + explicit ImmutableZeros(uint8_t* data, int64_t size, MemoryPool* pool) + : Buffer(data, size, CPUDevice::memory_manager(pool)), pool_(pool) {} + + ImmutableZeros() : Buffer(nullptr, 0), pool_(nullptr) {} + + ~ImmutableZeros() override; + + // Prevent copies and handle moves explicitly to avoid double free + ImmutableZeros(const ImmutableZeros&) = delete; + ImmutableZeros& operator=(const ImmutableZeros&) = delete; + + ImmutableZeros(ImmutableZeros&& other) noexcept + : Buffer(other.data_, other.size_, other.memory_manager()), pool_(other.pool_) { + other.pool_ = nullptr; + other.data_ = nullptr; + other.size_ = 0; + } + + ImmutableZeros& operator=(ImmutableZeros&& other) noexcept { + SetMemoryManager(other.memory_manager()); + std::swap(pool_, other.pool_); + std::swap(data_, other.data_); + std::swap(size_, other.size_); + return *this; + } + + private: + MemoryPool* pool_ = nullptr; +}; + #ifndef NDEBUG static constexpr uint8_t kAllocPoison = 0xBC; static constexpr uint8_t kReallocPoison = 0xBD; @@ -603,7 +636,98 @@ class BaseMemoryPoolImpl : public MemoryPool { stats_.UpdateAllocatedBytes(-size); } - void ReleaseUnused() override { Allocator::ReleaseUnused(); } + protected: + virtual Status AllocateImmutableZeros(int64_t size, uint8_t** out) { +#ifdef ARROW_DISABLE_MMAP_FOR_IMMUTABLE_ZEROS + // TODO: jemalloc and mimalloc support zero-initialized allocations as + // well, which might be faster than allocate + memset. + RETURN_NOT_OK(Allocate(size, out)); + std::memset(*out, 0, size); + return Status::OK(); +#else + return internal::MemoryMapZeros(size, out); +#endif + } + + void FreeImmutableZeros(uint8_t* buffer, int64_t size) override { +#ifdef ARROW_DISABLE_MMAP_FOR_IMMUTABLE_ZEROS + Free(buffer, size); +#else + internal::MemoryUnmapZeros(buffer, size); +#endif + } + + public: + Result> GetImmutableZeros(int64_t size) override { + // Thread-safely get the current largest buffer of zeros. + auto current_buffer = internal::atomic_load(&immutable_zeros_cache_); + + // If this buffer satisfies the requirements, return it. + if (current_buffer && current_buffer->size() >= size) { + return std::move(current_buffer); + } + + // Acquire the lock for allocating a new buffer. + std::lock_guard gg(immutable_zeros_mutex_); + + // Between our previous atomic load and acquisition of the lock, another + // thread may have allocated a buffer. So we need to check again. + current_buffer = internal::atomic_load(&immutable_zeros_cache_); + if (current_buffer && current_buffer->size() >= size) { + return std::move(current_buffer); + } + + // Let's now figure out a good size to allocate. This is done + // heuristically, with the following rules: + // - allocate at least the requested size (obviously); + // - allocate at least 2x the previous size; + // - allocate at least kMinAllocSize bytes (to avoid lots of small + // allocations). + static const int64_t kMinAllocSize = 4096; + int64_t alloc_size = + std::max(size, current_buffer ? (current_buffer->size() * 2) : kMinAllocSize); + + // Attempt to allocate the block. + uint8_t* data = nullptr; + auto result = AllocateImmutableZeros(alloc_size, &data); + + // If we fail to do so, fall back to trying to allocate the requested size + // exactly as a last-ditch effort. + if (!result.ok()) { + alloc_size = size; + RETURN_NOT_OK(AllocateImmutableZeros(alloc_size, &data)); + } + DCHECK_NE(data, nullptr); + + // Move ownership of the data block into an ImmutableZeros object. It will + // free the block when destroyed, i.e. when all shared_ptr references to it + // are reset or go out of scope. + current_buffer = std::make_shared(data, alloc_size, this); + + // Store a reference to the new block in the cache, so subsequent calls to + // this function (from this thread or from other threads) can use it, too. + internal::atomic_store(&immutable_zeros_cache_, current_buffer); + + return std::move(current_buffer); + } + + void ReleaseUnused() override { + // Get rid of the ImmutableZeros cache if we're the only one using it. If + // there are other pieces of code using it, getting rid of the cache won't + // deallocate it anyway, so it's better to hold onto it. + { + auto cache = internal::atomic_load(&immutable_zeros_cache_); + + // Because we now have a copy in our thread, the use count will be 2 if + // nothing else is using it. + if (cache.use_count() <= 2) { + internal::atomic_store(&immutable_zeros_cache_, + std::shared_ptr()); + } + } + + Allocator::ReleaseUnused(); + } int64_t bytes_allocated() const override { return stats_.bytes_allocated(); } @@ -611,6 +735,8 @@ class BaseMemoryPoolImpl : public MemoryPool { protected: internal::MemoryPoolStats stats_; + std::shared_ptr immutable_zeros_cache_; + std::mutex immutable_zeros_mutex_; }; class SystemMemoryPool : public BaseMemoryPoolImpl { @@ -719,6 +845,27 @@ static struct GlobalState { #endif } global_state; +ImmutableZeros::~ImmutableZeros() { + // Avoid calling pool_->FreeImmutableZeros if the global pools are destroyed + // (XXX this will not work with user-defined pools) + + // This can happen if a Future is destructing on one thread while or + // after memory pools are destructed on the main thread (as there is + // no guarantee of destructor order between thread/memory pools) + if (data_ && !global_state.is_finalizing()) { + pool_->FreeImmutableZeros(const_cast(data_), size_); + } +} + +Result> MemoryPool::GetImmutableZeros(int64_t size) { + uint8_t* data; + RETURN_NOT_OK(Allocate(size, &data)); + std::memset(data, 0, size); + return std::make_shared(data, size, this); +} + +void MemoryPool::FreeImmutableZeros(uint8_t* buffer, int64_t size) { Free(buffer, size); } + MemoryPool* system_memory_pool() { return global_state.system_memory_pool(); } Status jemalloc_memory_pool(MemoryPool** out) { @@ -949,7 +1096,7 @@ class PoolBuffer final : public ResizableBuffer { return Status::OK(); } - static std::shared_ptr MakeShared(MemoryPool* pool) { + static std::unique_ptr Make(MemoryPool* pool) { std::shared_ptr mm; if (pool == nullptr) { pool = default_memory_pool(); @@ -957,10 +1104,23 @@ class PoolBuffer final : public ResizableBuffer { } else { mm = CPUDevice::memory_manager(pool); } - return std::make_shared(std::move(mm), pool); + return std::unique_ptr(new PoolBuffer(std::move(mm), pool)); } - static std::unique_ptr MakeUnique(MemoryPool* pool) { + private: + MemoryPool* pool_; +}; + +/// An immutable Buffer containing zeros, whose lifetime is tied to a particular +/// MemoryPool +class ImmutableZerosPoolBuffer final : public Buffer { + public: + explicit ImmutableZerosPoolBuffer(std::shared_ptr&& zeros, int64_t size, + std::shared_ptr&& mm) + : Buffer(zeros->data(), size, std::move(mm)), zeros_(std::move(zeros)) {} + + static Result> Make(int64_t size, + MemoryPool* pool) { std::shared_ptr mm; if (pool == nullptr) { pool = default_memory_pool(); @@ -968,11 +1128,15 @@ class PoolBuffer final : public ResizableBuffer { } else { mm = CPUDevice::memory_manager(pool); } - return std::unique_ptr(new PoolBuffer(std::move(mm), pool)); + ARROW_ASSIGN_OR_RAISE(auto zeros, pool->GetImmutableZeros(size)); + return std::unique_ptr( + new ImmutableZerosPoolBuffer(std::move(zeros), size, std::move(mm))); } private: - MemoryPool* pool_; + // Note: we don't use this value directly; however, it needs to be here to keep a + // reference to the shared_ptr to keep the underlying zero buffer alive. + std::shared_ptr zeros_; }; namespace { @@ -989,13 +1153,16 @@ inline Result ResizePoolBuffer(PoolBufferPtr&& buffer, const int64_t } // namespace Result> AllocateBuffer(const int64_t size, MemoryPool* pool) { - return ResizePoolBuffer>(PoolBuffer::MakeUnique(pool), size); + return ResizePoolBuffer>(PoolBuffer::Make(pool), size); } Result> AllocateResizableBuffer(const int64_t size, MemoryPool* pool) { - return ResizePoolBuffer>(PoolBuffer::MakeUnique(pool), - size); + return ResizePoolBuffer>(PoolBuffer::Make(pool), size); +} + +Result> MakeBufferOfZeros(const int64_t size, MemoryPool* pool) { + return ImmutableZerosPoolBuffer::Make(size, pool); } } // namespace arrow diff --git a/cpp/src/arrow/memory_pool.h b/cpp/src/arrow/memory_pool.h index 58b375af3a9..97f6568aa61 100644 --- a/cpp/src/arrow/memory_pool.h +++ b/cpp/src/arrow/memory_pool.h @@ -81,12 +81,32 @@ class ARROW_EXPORT MemoryPool { /// Free an allocated region. /// - /// @param buffer Pointer to the start of the allocated memory region - /// @param size Allocated size located at buffer. An allocator implementation + /// \param buffer Pointer to the start of the allocated memory region + /// \param size Allocated size located at buffer. An allocator implementation /// may use this for tracking the amount of allocated bytes as well as for /// faster deallocation if supported by its backend. virtual void Free(uint8_t* buffer, int64_t size) = 0; + /// Return a shared block of immutable zero bytes of at least the given size. + /// + /// These blocks are useful when some other readable block of data is needed + /// to comply with an interface, but the contents of the block don't matter + /// and/or should just be zero. Unlike other allocations, the same underlying + /// block of memory can be shared between all users (for a particular size + /// limit), thus reducing memory footprint. In fact, some implementations do + /// not require allocation of physical memory at all, and instead rely on + /// MMU tricks to create arbitrarily large blocks of zeroed virtual memory. + /// + /// The allocated region shall be 64-byte aligned. The region will be + /// deallocated automatically when all shared_ptrs to the region are + /// destroyed. + /// + /// While this returns a shared_ptr to a Buffer, you may want to use the + /// MakeBufferOfZeros() function, analogous to AllocateBuffer(). This + /// function returns a Buffer that advertises exactly the requested size, + /// and that also embeds/hides the shared ownership of the underlying buffer. + virtual Result> GetImmutableZeros(int64_t size); + /// Return unused memory to the OS /// /// Only applies to allocators that hold onto unused memory. This will be @@ -109,6 +129,16 @@ class ARROW_EXPORT MemoryPool { protected: MemoryPool() = default; + + /// Free a memory region allocated by GetImmutableZeros(). + /// + /// \param buffer Pointer to the start of the allocated memory region + /// \param size Allocated size located at buffer. An allocator implementation + /// may use this for tracking the amount of allocated bytes as well as for + /// faster deallocation if supported by its backend. + virtual void FreeImmutableZeros(uint8_t* buffer, int64_t size); + + friend class ImmutableZeros; }; class ARROW_EXPORT LoggingMemoryPool : public MemoryPool { diff --git a/cpp/src/arrow/util/io_util.cc b/cpp/src/arrow/util/io_util.cc index 853e30081dc..ca4491fbfee 100644 --- a/cpp/src/arrow/util/io_util.cc +++ b/cpp/src/arrow/util/io_util.cc @@ -1138,6 +1138,41 @@ Status MemoryAdviseWillNeed(const std::vector& regions) { #endif } +// +// Compatible way for getting (large amounts of) immutable, zeroed memory +// + +Status MemoryMapZeros(size_t size, uint8_t** out) { +#ifdef __linux__ + *out = static_cast( + mmap(nullptr, size, PROT_READ, MAP_PRIVATE | MAP_ANONYMOUS | MAP_NORESERVE, -1, 0)); + if (*out == MAP_FAILED) { + auto err = errno; + return Status::OutOfMemory("Failed to allocate zero buffer of size ", size, ": ", + strerror(err)); + } + return Status::OK(); +#else + // Fallback. + // TODO: can use VirtualAlloc for Windows + *out = static_cast(std::calloc(1, size)); + if (*out == nullptr) { + auto err = errno; + return Status::OutOfMemory("Failed to allocate zero buffer of size ", size, ": ", + strerror(err)); + } + return Status::OK(); +#endif +} + +void MemoryUnmapZeros(uint8_t* buffer, size_t size) { +#ifdef __linux__ + munmap(buffer, size); +#else + std::free(buffer); +#endif +} + // // Closing files // diff --git a/cpp/src/arrow/util/io_util.h b/cpp/src/arrow/util/io_util.h index c094727a647..04600355232 100644 --- a/cpp/src/arrow/util/io_util.h +++ b/cpp/src/arrow/util/io_util.h @@ -179,6 +179,11 @@ Status MemoryMapRemap(void* addr, size_t old_size, size_t new_size, int fildes, ARROW_EXPORT Status MemoryAdviseWillNeed(const std::vector& regions); +ARROW_EXPORT +Status MemoryMapZeros(size_t size, uint8_t** out); +ARROW_EXPORT +void MemoryUnmapZeros(uint8_t* buffer, size_t size); + ARROW_EXPORT Result GetEnvVar(const char* name); ARROW_EXPORT