From b565f67056df5dbfabe6e10910d7bba230e5f1ad Mon Sep 17 00:00:00 2001 From: Ruoxi Sun Date: Mon, 26 Feb 2024 21:09:40 +0800 Subject: [PATCH 01/14] WIP --- cpp/src/arrow/scalar_test.cc | 38 ++++++++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/cpp/src/arrow/scalar_test.cc b/cpp/src/arrow/scalar_test.cc index 09dfde32271..c98e27654a1 100644 --- a/cpp/src/arrow/scalar_test.cc +++ b/cpp/src/arrow/scalar_test.cc @@ -23,6 +23,7 @@ #include #include #include +#include #include #include @@ -1984,4 +1985,41 @@ TEST_F(TestExtensionScalar, ValidateErrors) { AssertValidationFails(scalar); } +template +class TestScalarScratchSpace : public ::testing::Test { + public: + TestScalarScratchSpace() = default; +}; + +TYPED_TEST_SUITE(TestScalarScratchSpace, BaseBinaryOrBinaryViewLikeArrowTypes); + +// GH-40069: race condition when filling the scratch space of a scalar in parallel. +TYPED_TEST(TestScalarScratchSpace, ParallelFillScratchSpace) { + using ScalarType = typename TypeTraits::ScalarType; + + std::string value = "test data"; + + auto scalar_val = std::make_shared(value); + // ASSERT_EQ(value, scalar_val->value); + ASSERT_TRUE(scalar_val->is_valid); + ASSERT_OK(scalar_val->ValidateFull()); + + auto expected_type = TypeTraits::type_singleton(); + ASSERT_TRUE(scalar_val->type->Equals(*expected_type)); + + ArraySpan span1, span2; + + auto fut1 = std::async(std::launch::async, [&]() { + span1.FillFromScalar(*scalar_val); + }); + auto fut2 = std::async(std::launch::async, [&]() { + span2.FillFromScalar(*scalar_val); + }); + fut1.wait(); + fut2.wait(); + + ASSERT_EQ(span1.length, 1); + ASSERT_EQ(span2.length, 1); +} + } // namespace arrow From d520f741a0cc1b059861f36f77d70ef421816bb3 Mon Sep 17 00:00:00 2001 From: Ruoxi Sun Date: Mon, 26 Feb 2024 23:56:38 +0800 Subject: [PATCH 02/14] Use relaxed atomic store to write scalar scratch space --- cpp/src/arrow/array/data.cc | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/cpp/src/arrow/array/data.cc b/cpp/src/arrow/array/data.cc index 80c411dfa6a..7bce6e35c56 100644 --- a/cpp/src/arrow/array/data.cc +++ b/cpp/src/arrow/array/data.cc @@ -286,8 +286,10 @@ namespace { template BufferSpan OffsetsForScalar(uint8_t* scratch_space, offset_type value_size) { auto* offsets = reinterpret_cast(scratch_space); - offsets[0] = 0; - offsets[1] = static_cast(value_size); + reinterpret_cast*>(&offsets[0]) + ->store(0, std::memory_order_relaxed); + reinterpret_cast*>(&offsets[1]) + ->store(static_cast(value_size), std::memory_order_relaxed); static_assert(2 * sizeof(offset_type) <= 16); return {scratch_space, sizeof(offset_type) * 2}; } @@ -297,8 +299,10 @@ std::pair OffsetsAndSizesForScalar(uint8_t* scratch_spac offset_type value_size) { auto* offsets = scratch_space; auto* sizes = scratch_space + sizeof(offset_type); - reinterpret_cast(offsets)[0] = 0; - reinterpret_cast(sizes)[0] = value_size; + reinterpret_cast*>(offsets)->store(0, + std::memory_order_relaxed); + reinterpret_cast*>(sizes)->store( + static_cast(value_size), std::memory_order_relaxed); static_assert(2 * sizeof(offset_type) <= 16); return {BufferSpan{offsets, sizeof(offset_type)}, BufferSpan{sizes, sizeof(offset_type)}}; From 4c67e74c4c869bff51d4b60d91e624fc47cf5740 Mon Sep 17 00:00:00 2001 From: Ruoxi Sun Date: Tue, 27 Feb 2024 01:57:38 +0800 Subject: [PATCH 03/14] WIP --- cpp/src/arrow/scalar_test.cc | 65 ++++++++++++++++++++++-------------- 1 file changed, 40 insertions(+), 25 deletions(-) diff --git a/cpp/src/arrow/scalar_test.cc b/cpp/src/arrow/scalar_test.cc index c98e27654a1..1fcabb07fa9 100644 --- a/cpp/src/arrow/scalar_test.cc +++ b/cpp/src/arrow/scalar_test.cc @@ -16,6 +16,7 @@ // under the License. #include +#include #include #include #include @@ -23,7 +24,6 @@ #include #include #include -#include #include #include @@ -1986,40 +1986,55 @@ TEST_F(TestExtensionScalar, ValidateErrors) { } template -class TestScalarScratchSpace : public ::testing::Test { - public: - TestScalarScratchSpace() = default; +struct TestFillArraySpanBaseBinaryOrBinaryViewLikeTraits { + using DataType = T; + using ScalarType = typename TypeTraits::ScalarType; + + static std::shared_ptr MakeScalar() { + std::string value = "test data"; + return std::make_shared(value); + } }; -TYPED_TEST_SUITE(TestScalarScratchSpace, BaseBinaryOrBinaryViewLikeArrowTypes); +using TestFillArraySpanBaseBinaryOrBinaryViewLikeTypes = + ::testing::Types, + TestFillArraySpanBaseBinaryOrBinaryViewLikeTraits, + TestFillArraySpanBaseBinaryOrBinaryViewLikeTraits, + TestFillArraySpanBaseBinaryOrBinaryViewLikeTraits, + TestFillArraySpanBaseBinaryOrBinaryViewLikeTraits, + TestFillArraySpanBaseBinaryOrBinaryViewLikeTraits>; -// GH-40069: race condition when filling the scratch space of a scalar in parallel. -TYPED_TEST(TestScalarScratchSpace, ParallelFillScratchSpace) { - using ScalarType = typename TypeTraits::ScalarType; +template +class TestFillArraySpan : public ::testing::Test { + public: + TestFillArraySpan() = default; +}; - std::string value = "test data"; +TYPED_TEST_SUITE(TestFillArraySpan, TestFillArraySpanBaseBinaryOrBinaryViewLikeTypes); - auto scalar_val = std::make_shared(value); - // ASSERT_EQ(value, scalar_val->value); - ASSERT_TRUE(scalar_val->is_valid); - ASSERT_OK(scalar_val->ValidateFull()); +// GH-40069: race condition when filling the scratch space of a scalar in parallel. +TYPED_TEST(TestFillArraySpan, ParallelFill) { + using DataType = typename TypeParam::DataType; + // using ddScalarType = typename TypeParam::ScalarType; - auto expected_type = TypeTraits::type_singleton(); - ASSERT_TRUE(scalar_val->type->Equals(*expected_type)); + auto scalar_val = TypeParam::MakeScalar(); - ArraySpan span1, span2; + // Lambda to fill an ArraySpan with the scalar (and consequently fill the scratch + // space of the scalar), and use the ArraySpan a bit. + auto array_span_from_scalar = [&]() { + auto expected_type = TypeTraits::type_singleton(); - auto fut1 = std::async(std::launch::async, [&]() { - span1.FillFromScalar(*scalar_val); - }); - auto fut2 = std::async(std::launch::async, [&]() { - span2.FillFromScalar(*scalar_val); - }); + ArraySpan span; + span.FillFromScalar(*scalar_val); + ASSERT_TRUE(span.type->Equals(*expected_type)); + ASSERT_EQ(span.length, 1); + }; + // Two concurrent calls to the lambda are just enough for TSAN to report a race + // condition. + auto fut1 = std::async(std::launch::async, array_span_from_scalar); + auto fut2 = std::async(std::launch::async, array_span_from_scalar); fut1.wait(); fut2.wait(); - - ASSERT_EQ(span1.length, 1); - ASSERT_EQ(span2.length, 1); } } // namespace arrow From c9e39b1aaf7e8b2ed96e570bcdb42a044fdd9176 Mon Sep 17 00:00:00 2001 From: Ruoxi Sun Date: Tue, 27 Feb 2024 13:30:19 +0800 Subject: [PATCH 04/14] Static assert that the sizes of atomic offset type and offset type are equal --- cpp/src/arrow/array/data.cc | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cpp/src/arrow/array/data.cc b/cpp/src/arrow/array/data.cc index 7bce6e35c56..c95f4009194 100644 --- a/cpp/src/arrow/array/data.cc +++ b/cpp/src/arrow/array/data.cc @@ -286,6 +286,7 @@ namespace { template BufferSpan OffsetsForScalar(uint8_t* scratch_space, offset_type value_size) { auto* offsets = reinterpret_cast(scratch_space); + static_assert(sizeof(std::atomic) == sizeof(offset_type)); reinterpret_cast*>(&offsets[0]) ->store(0, std::memory_order_relaxed); reinterpret_cast*>(&offsets[1]) @@ -299,6 +300,7 @@ std::pair OffsetsAndSizesForScalar(uint8_t* scratch_spac offset_type value_size) { auto* offsets = scratch_space; auto* sizes = scratch_space + sizeof(offset_type); + static_assert(sizeof(std::atomic) == sizeof(offset_type)); reinterpret_cast*>(offsets)->store(0, std::memory_order_relaxed); reinterpret_cast*>(sizes)->store( From 1ea077ad752c524e57b142848414b46199234503 Mon Sep 17 00:00:00 2001 From: Rossi Sun Date: Tue, 27 Feb 2024 13:31:03 +0800 Subject: [PATCH 05/14] Update cpp/src/arrow/array/data.cc Co-authored-by: Benjamin Kietzman --- cpp/src/arrow/array/data.cc | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/cpp/src/arrow/array/data.cc b/cpp/src/arrow/array/data.cc index c95f4009194..0b3ca4b8b87 100644 --- a/cpp/src/arrow/array/data.cc +++ b/cpp/src/arrow/array/data.cc @@ -287,10 +287,9 @@ template BufferSpan OffsetsForScalar(uint8_t* scratch_space, offset_type value_size) { auto* offsets = reinterpret_cast(scratch_space); static_assert(sizeof(std::atomic) == sizeof(offset_type)); - reinterpret_cast*>(&offsets[0]) - ->store(0, std::memory_order_relaxed); - reinterpret_cast*>(&offsets[1]) - ->store(static_cast(value_size), std::memory_order_relaxed); + auto* offsets = reinterpret_cast*>(scratch_space); + offsets[0].store(0, std::memory_order_relaxed); + offsets[1].store(value_size, std::memory_order_relaxed); static_assert(2 * sizeof(offset_type) <= 16); return {scratch_space, sizeof(offset_type) * 2}; } From 4af89a5ab4cd073bfb7e8cfa6d57125d4f5092e1 Mon Sep 17 00:00:00 2001 From: Rossi Sun Date: Tue, 27 Feb 2024 13:31:12 +0800 Subject: [PATCH 06/14] Update cpp/src/arrow/array/data.cc Co-authored-by: Benjamin Kietzman --- cpp/src/arrow/array/data.cc | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/cpp/src/arrow/array/data.cc b/cpp/src/arrow/array/data.cc index 0b3ca4b8b87..132db07302f 100644 --- a/cpp/src/arrow/array/data.cc +++ b/cpp/src/arrow/array/data.cc @@ -300,10 +300,10 @@ std::pair OffsetsAndSizesForScalar(uint8_t* scratch_spac auto* offsets = scratch_space; auto* sizes = scratch_space + sizeof(offset_type); static_assert(sizeof(std::atomic) == sizeof(offset_type)); - reinterpret_cast*>(offsets)->store(0, - std::memory_order_relaxed); - reinterpret_cast*>(sizes)->store( - static_cast(value_size), std::memory_order_relaxed); + auto* offsets = reinterpret_cast*>(scratch_space); + auto* sizes = offsets + 1; + offsets[0].store(0, std::memory_order_relaxed); + sizes[0].store(value_size, std::memory_order_relaxed); static_assert(2 * sizeof(offset_type) <= 16); return {BufferSpan{offsets, sizeof(offset_type)}, BufferSpan{sizes, sizeof(offset_type)}}; From aece9de37885ca35c7dc7e126caee8b50e34db7d Mon Sep 17 00:00:00 2001 From: Ruoxi Sun Date: Tue, 27 Feb 2024 14:03:16 +0800 Subject: [PATCH 07/14] Fix conflict and add some comments --- cpp/src/arrow/array/data.cc | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/cpp/src/arrow/array/data.cc b/cpp/src/arrow/array/data.cc index 132db07302f..2c7f5f35686 100644 --- a/cpp/src/arrow/array/data.cc +++ b/cpp/src/arrow/array/data.cc @@ -285,7 +285,9 @@ namespace { template BufferSpan OffsetsForScalar(uint8_t* scratch_space, offset_type value_size) { - auto* offsets = reinterpret_cast(scratch_space); + // The scalar scratch space could be filled concurrently (with the same content), thus + // we use relaxed atomic stores. This consequently requires the size of the atomic to + // match the size of the offset type. static_assert(sizeof(std::atomic) == sizeof(offset_type)); auto* offsets = reinterpret_cast*>(scratch_space); offsets[0].store(0, std::memory_order_relaxed); @@ -297,8 +299,9 @@ BufferSpan OffsetsForScalar(uint8_t* scratch_space, offset_type value_size) { template std::pair OffsetsAndSizesForScalar(uint8_t* scratch_space, offset_type value_size) { - auto* offsets = scratch_space; - auto* sizes = scratch_space + sizeof(offset_type); + // The scalar scratch space could be filled concurrently (with the same content), thus + // we use relaxed atomic stores. This consequently requires the size of the atomic to + // match the size of the offset type. static_assert(sizeof(std::atomic) == sizeof(offset_type)); auto* offsets = reinterpret_cast*>(scratch_space); auto* sizes = offsets + 1; From d86c955341a0f19cdd70465306ed9d58271b6a04 Mon Sep 17 00:00:00 2001 From: Ruoxi Sun Date: Tue, 27 Feb 2024 15:14:04 +0800 Subject: [PATCH 08/14] Update test --- cpp/src/arrow/scalar_test.cc | 76 +++++++++++++----------------------- 1 file changed, 27 insertions(+), 49 deletions(-) diff --git a/cpp/src/arrow/scalar_test.cc b/cpp/src/arrow/scalar_test.cc index 1fcabb07fa9..411bbce55c4 100644 --- a/cpp/src/arrow/scalar_test.cc +++ b/cpp/src/arrow/scalar_test.cc @@ -1985,56 +1985,34 @@ TEST_F(TestExtensionScalar, ValidateErrors) { AssertValidationFails(scalar); } -template -struct TestFillArraySpanBaseBinaryOrBinaryViewLikeTraits { - using DataType = T; - using ScalarType = typename TypeTraits::ScalarType; - - static std::shared_ptr MakeScalar() { - std::string value = "test data"; - return std::make_shared(value); +TEST(TestScalarFillArraySpan, ParallelFill) { + for (auto scalar : { + ScalarFromJSON(utf8(), R"("test data")"), + // ScalarFromJSON(utf8_view(), R"("test data")"), + // ScalarFromJSON(list(int8()), "[1, 2, 3]"), + // ScalarFromJSON(list_view(int8()), "[1, 2, 3]"), + // TODO: more coming. + }) { + ARROW_SCOPED_TRACE("Scalar: ", scalar->ToString()); + + // Lambda to fill an ArraySpan with the scalar (and consequently fill the scratch + // space of the scalar), and use the ArraySpan a bit. + auto array_span_from_scalar = [&scalar]() { + ArraySpan span; + span.FillFromScalar(*scalar); + ASSERT_TRUE(span.type->Equals(scalar->type)); + ASSERT_EQ(span.length, 1); + auto values = span.GetValues(1); + ASSERT_EQ(values[0], 0); + }; + + // Two concurrent calls to the lambda are just enough for TSAN to report a race + // condition. + auto fut1 = std::async(std::launch::async, array_span_from_scalar); + auto fut2 = std::async(std::launch::async, array_span_from_scalar); + fut1.get(); + fut2.get(); } -}; - -using TestFillArraySpanBaseBinaryOrBinaryViewLikeTypes = - ::testing::Types, - TestFillArraySpanBaseBinaryOrBinaryViewLikeTraits, - TestFillArraySpanBaseBinaryOrBinaryViewLikeTraits, - TestFillArraySpanBaseBinaryOrBinaryViewLikeTraits, - TestFillArraySpanBaseBinaryOrBinaryViewLikeTraits, - TestFillArraySpanBaseBinaryOrBinaryViewLikeTraits>; - -template -class TestFillArraySpan : public ::testing::Test { - public: - TestFillArraySpan() = default; -}; - -TYPED_TEST_SUITE(TestFillArraySpan, TestFillArraySpanBaseBinaryOrBinaryViewLikeTypes); - -// GH-40069: race condition when filling the scratch space of a scalar in parallel. -TYPED_TEST(TestFillArraySpan, ParallelFill) { - using DataType = typename TypeParam::DataType; - // using ddScalarType = typename TypeParam::ScalarType; - - auto scalar_val = TypeParam::MakeScalar(); - - // Lambda to fill an ArraySpan with the scalar (and consequently fill the scratch - // space of the scalar), and use the ArraySpan a bit. - auto array_span_from_scalar = [&]() { - auto expected_type = TypeTraits::type_singleton(); - - ArraySpan span; - span.FillFromScalar(*scalar_val); - ASSERT_TRUE(span.type->Equals(*expected_type)); - ASSERT_EQ(span.length, 1); - }; - // Two concurrent calls to the lambda are just enough for TSAN to report a race - // condition. - auto fut1 = std::async(std::launch::async, array_span_from_scalar); - auto fut2 = std::async(std::launch::async, array_span_from_scalar); - fut1.wait(); - fut2.wait(); } } // namespace arrow From 6ee5d124bed16f8dda14ba0ae519a75efc35765d Mon Sep 17 00:00:00 2001 From: Ruoxi Sun Date: Tue, 27 Feb 2024 17:27:48 +0800 Subject: [PATCH 09/14] Use DCLP to prevent race conditions in reading/writing scalar scratch space --- cpp/src/arrow/array/data.cc | 102 ++++++++++++++++++++---------------- cpp/src/arrow/scalar.cc | 12 +++++ cpp/src/arrow/scalar.h | 22 ++++++++ 3 files changed, 91 insertions(+), 45 deletions(-) diff --git a/cpp/src/arrow/array/data.cc b/cpp/src/arrow/array/data.cc index 2c7f5f35686..8ca7ad36e3d 100644 --- a/cpp/src/arrow/array/data.cc +++ b/cpp/src/arrow/array/data.cc @@ -284,32 +284,58 @@ void ArraySpan::SetMembers(const ArrayData& data) { namespace { template -BufferSpan OffsetsForScalar(uint8_t* scratch_space, offset_type value_size) { - // The scalar scratch space could be filled concurrently (with the same content), thus - // we use relaxed atomic stores. This consequently requires the size of the atomic to - // match the size of the offset type. - static_assert(sizeof(std::atomic) == sizeof(offset_type)); - auto* offsets = reinterpret_cast*>(scratch_space); - offsets[0].store(0, std::memory_order_relaxed); - offsets[1].store(value_size, std::memory_order_relaxed); +BufferSpan OffsetsForScalar(const internal::ArraySpanFillFromScalarScratchSpace& scalar, + offset_type value_size) { + scalar.FillScratchSpace([&](uint8_t* scratch_space) { + auto* offsets = reinterpret_cast(scratch_space); + offsets[0] = 0; + offsets[1] = static_cast(value_size); + }); static_assert(2 * sizeof(offset_type) <= 16); - return {scratch_space, sizeof(offset_type) * 2}; + return {scalar.scratch_space_, sizeof(offset_type) * 2}; } template -std::pair OffsetsAndSizesForScalar(uint8_t* scratch_space, - offset_type value_size) { - // The scalar scratch space could be filled concurrently (with the same content), thus - // we use relaxed atomic stores. This consequently requires the size of the atomic to - // match the size of the offset type. - static_assert(sizeof(std::atomic) == sizeof(offset_type)); - auto* offsets = reinterpret_cast*>(scratch_space); - auto* sizes = offsets + 1; - offsets[0].store(0, std::memory_order_relaxed); - sizes[0].store(value_size, std::memory_order_relaxed); +std::pair OffsetsAndSizesForScalar( + const internal::ArraySpanFillFromScalarScratchSpace& scalar, offset_type value_size) { + scalar.FillScratchSpace([&](uint8_t* scratch_space) { + auto* offsets = scratch_space; + auto* sizes = scratch_space + sizeof(offset_type); + reinterpret_cast(offsets)[0] = 0; + reinterpret_cast(sizes)[0] = value_size; + }); static_assert(2 * sizeof(offset_type) <= 16); - return {BufferSpan{offsets, sizeof(offset_type)}, - BufferSpan{sizes, sizeof(offset_type)}}; + return {BufferSpan{scalar.scratch_space_, sizeof(offset_type)}, + BufferSpan{scalar.scratch_space_ + sizeof(offset_type), sizeof(offset_type)}}; +} + +std::pair TypeCodeAndOffsetsForDenseUnionScalar( + const internal::ArraySpanFillFromScalarScratchSpace& scalar, int8_t type_code) { + // Dense union needs scratch space to store both offsets and a type code + struct UnionScratchSpace { + alignas(int64_t) int8_t type_code; + alignas(int64_t) uint8_t offsets[sizeof(int32_t) * 2]; + }; + auto* union_scratch_space = reinterpret_cast(scalar.scratch_space_); + scalar.FillScratchSpace([&](uint8_t*) { + union_scratch_space->type_code = type_code; + reinterpret_cast(union_scratch_space->offsets)[0] = 0; + reinterpret_cast(union_scratch_space->offsets)[1] = 1; + }); + static_assert(sizeof(UnionScratchSpace) <= + sizeof(internal::ArraySpanFillFromScalarScratchSpace::scratch_space_)); + return {BufferSpan{reinterpret_cast(&union_scratch_space->type_code), 1}, + BufferSpan{reinterpret_cast(&union_scratch_space->offsets), 1}}; +} + +BufferSpan TypeCodeAndOffsetsForSparseUnionScalar( + const internal::ArraySpanFillFromScalarScratchSpace& scalar, int8_t type_code) { + scalar.FillScratchSpace([&](uint8_t* scratch_space) { + reinterpret_cast(scratch_space)[0] = type_code; + }); + static_assert(sizeof(int8_t) <= + sizeof(internal::ArraySpanFillFromScalarScratchSpace::scratch_space_)); + return {scalar.scratch_space_, 1}; } int GetNumBuffers(const DataType& type) { @@ -423,11 +449,10 @@ void ArraySpan::FillFromScalar(const Scalar& value) { data_size = scalar.value->size(); } if (is_binary_like(type_id)) { - this->buffers[1] = - OffsetsForScalar(scalar.scratch_space_, static_cast(data_size)); + this->buffers[1] = OffsetsForScalar(scalar, static_cast(data_size)); } else { // is_large_binary_like - this->buffers[1] = OffsetsForScalar(scalar.scratch_space_, data_size); + this->buffers[1] = OffsetsForScalar(scalar, data_size); } this->buffers[2].data = const_cast(data_buffer); this->buffers[2].size = data_size; @@ -465,16 +490,15 @@ void ArraySpan::FillFromScalar(const Scalar& value) { } if (type_id == Type::LIST || type_id == Type::MAP) { - this->buffers[1] = - OffsetsForScalar(scalar.scratch_space_, static_cast(value_length)); + this->buffers[1] = OffsetsForScalar(scalar, static_cast(value_length)); } else if (type_id == Type::LARGE_LIST) { - this->buffers[1] = OffsetsForScalar(scalar.scratch_space_, value_length); + this->buffers[1] = OffsetsForScalar(scalar, value_length); } else if (type_id == Type::LIST_VIEW) { - std::tie(this->buffers[1], this->buffers[2]) = OffsetsAndSizesForScalar( - scalar.scratch_space_, static_cast(value_length)); + std::tie(this->buffers[1], this->buffers[2]) = + OffsetsAndSizesForScalar(scalar, static_cast(value_length)); } else if (type_id == Type::LARGE_LIST_VIEW) { std::tie(this->buffers[1], this->buffers[2]) = - OffsetsAndSizesForScalar(scalar.scratch_space_, value_length); + OffsetsAndSizesForScalar(scalar, value_length); } else { DCHECK_EQ(type_id, Type::FIXED_SIZE_LIST); // FIXED_SIZE_LIST: does not have a second buffer @@ -488,27 +512,14 @@ void ArraySpan::FillFromScalar(const Scalar& value) { this->child_data[i].FillFromScalar(*scalar.value[i]); } } else if (is_union(type_id)) { - // Dense union needs scratch space to store both offsets and a type code - struct UnionScratchSpace { - alignas(int64_t) int8_t type_code; - alignas(int64_t) uint8_t offsets[sizeof(int32_t) * 2]; - }; - static_assert(sizeof(UnionScratchSpace) <= sizeof(UnionScalar::scratch_space_)); - auto* union_scratch_space = reinterpret_cast( - &checked_cast(value).scratch_space_); - // First buffer is kept null since unions have no validity vector this->buffers[0] = {}; - union_scratch_space->type_code = checked_cast(value).type_code; - this->buffers[1].data = reinterpret_cast(&union_scratch_space->type_code); - this->buffers[1].size = 1; - this->child_data.resize(this->type->num_fields()); if (type_id == Type::DENSE_UNION) { const auto& scalar = checked_cast(value); - this->buffers[2] = - OffsetsForScalar(union_scratch_space->offsets, static_cast(1)); + std::tie(this->buffers[1], this->buffers[2]) = + TypeCodeAndOffsetsForDenseUnionScalar(scalar, scalar.type_code); // We can't "see" the other arrays in the union, but we put the "active" // union array in the right place and fill zero-length arrays for the // others @@ -525,6 +536,7 @@ void ArraySpan::FillFromScalar(const Scalar& value) { } } else { const auto& scalar = checked_cast(value); + TypeCodeAndOffsetsForSparseUnionScalar(scalar, scalar.type_code); // Sparse union scalars have a full complement of child values even // though only one of them is relevant, so we just fill them in here for (int i = 0; i < static_cast(this->child_data.size()); ++i) { diff --git a/cpp/src/arrow/scalar.cc b/cpp/src/arrow/scalar.cc index 6996b46c8b6..e4766a30c1e 100644 --- a/cpp/src/arrow/scalar.cc +++ b/cpp/src/arrow/scalar.cc @@ -554,6 +554,18 @@ Status Scalar::ValidateFull() const { return ScalarValidateImpl(/*full_validation=*/true).Validate(*this); } +namespace internal { +void ArraySpanFillFromScalarScratchSpace::FillScratchSpace(FillScratchSpaceFn fn) const { + if (!scratch_space_fill_.filled_.load(std::memory_order_acquire)) { + std::lock_guard lock(scratch_space_fill_.mutex_); + if (!scratch_space_fill_.filled_.load(std::memory_order_relaxed)) { + fn(scratch_space_); + scratch_space_fill_.filled_.store(true, std::memory_order_release); + } + } +} +} // namespace internal + BaseBinaryScalar::BaseBinaryScalar(std::string s, std::shared_ptr type) : BaseBinaryScalar(Buffer::FromString(std::move(s)), std::move(type)) {} diff --git a/cpp/src/arrow/scalar.h b/cpp/src/arrow/scalar.h index 65c5ee4df0a..e40691b5f17 100644 --- a/cpp/src/arrow/scalar.h +++ b/cpp/src/arrow/scalar.h @@ -22,6 +22,7 @@ #include #include +#include #include #include #include @@ -136,6 +137,27 @@ struct ARROW_EXPORT ArraySpanFillFromScalarScratchSpace { // Scalar- including binary scalars where we need to create a buffer // that looks like two 32-bit or 64-bit offsets. alignas(int64_t) mutable uint8_t scratch_space_[sizeof(int64_t) * 2]; + + using FillScratchSpaceFn = std::function; + // Race-free filling of the scratch space. + void FillScratchSpace(FillScratchSpaceFn fn) const; + + private: + // Structure using DCLP to assists the avoidance of the race conditions in + // reading/writing the scratch space. + struct ScratchSpaceFill { + std::atomic filled_{false}; + std::mutex mutex_; + + // Boilerplate code to make this structure default-constructible/copyable/movable, so + // that it doesn't affect the default-constructibility/copyability/movability of the + // containing class. + ScratchSpaceFill() = default; + ScratchSpaceFill(const ScratchSpaceFill&) {} + ScratchSpaceFill(ScratchSpaceFill&& other) {} + ScratchSpaceFill& operator=(const ScratchSpaceFill&) { return *this; } + ScratchSpaceFill& operator=(ScratchSpaceFill&&) { return *this; } + } mutable scratch_space_fill_; }; struct ARROW_EXPORT PrimitiveScalarBase : public Scalar { From 35491e752ba5b7bba1d27ca89cb150e38c0532c6 Mon Sep 17 00:00:00 2001 From: Ruoxi Sun Date: Tue, 27 Feb 2024 17:51:18 +0800 Subject: [PATCH 10/14] Remove useless header --- cpp/src/arrow/scalar.h | 1 - 1 file changed, 1 deletion(-) diff --git a/cpp/src/arrow/scalar.h b/cpp/src/arrow/scalar.h index e40691b5f17..339d8b4a5e9 100644 --- a/cpp/src/arrow/scalar.h +++ b/cpp/src/arrow/scalar.h @@ -22,7 +22,6 @@ #include #include -#include #include #include #include From 56b46dc5729653cc6b012b879757785d5935b8ad Mon Sep 17 00:00:00 2001 From: Ruoxi Sun Date: Tue, 27 Feb 2024 19:04:43 +0800 Subject: [PATCH 11/14] Fix --- cpp/src/arrow/array/data.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/array/data.cc b/cpp/src/arrow/array/data.cc index 8ca7ad36e3d..4affa20d5bb 100644 --- a/cpp/src/arrow/array/data.cc +++ b/cpp/src/arrow/array/data.cc @@ -536,7 +536,7 @@ void ArraySpan::FillFromScalar(const Scalar& value) { } } else { const auto& scalar = checked_cast(value); - TypeCodeAndOffsetsForSparseUnionScalar(scalar, scalar.type_code); + this->buffers[1] = TypeCodeAndOffsetsForSparseUnionScalar(scalar, scalar.type_code); // Sparse union scalars have a full complement of child values even // though only one of them is relevant, so we just fill them in here for (int i = 0; i < static_cast(this->child_data.size()); ++i) { From cd896d7798387de2ade61bde068b87ec59ab0234 Mon Sep 17 00:00:00 2001 From: Ruoxi Sun Date: Tue, 27 Feb 2024 19:11:13 +0800 Subject: [PATCH 12/14] Fix run end type --- cpp/src/arrow/array/data.cc | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/cpp/src/arrow/array/data.cc b/cpp/src/arrow/array/data.cc index 4affa20d5bb..3048431b9eb 100644 --- a/cpp/src/arrow/array/data.cc +++ b/cpp/src/arrow/array/data.cc @@ -338,6 +338,17 @@ BufferSpan TypeCodeAndOffsetsForSparseUnionScalar( return {scalar.scratch_space_, 1}; } +template +BufferSpan RunEndForRunEndEncodedScalar( + const internal::ArraySpanFillFromScalarScratchSpace& scalar, run_end_type run_end) { + scalar.FillScratchSpace([&](uint8_t* scratch_space) { + reinterpret_cast(scratch_space)[0] = run_end; + }); + static_assert(sizeof(run_end_type) <= + sizeof(internal::ArraySpanFillFromScalarScratchSpace::scratch_space_)); + return {scalar.scratch_space_, sizeof(run_end_type)}; +} + int GetNumBuffers(const DataType& type) { switch (type.id()) { case Type::NA: @@ -559,9 +570,7 @@ void ArraySpan::FillFromScalar(const Scalar& value) { e.type = scalar.run_end_type().get(); e.length = 1; e.null_count = 0; - e.buffers[1].data = scalar.scratch_space_; - e.buffers[1].size = sizeof(run_end); - reinterpret_cast(scalar.scratch_space_)[0] = run_end; + e.buffers[1] = RunEndForRunEndEncodedScalar(scalar, run_end); }; switch (scalar.run_end_type()->id()) { From 8b42d7552257d1322658b9d6cb29f73fd676e66c Mon Sep 17 00:00:00 2001 From: Ruoxi Sun Date: Tue, 27 Feb 2024 19:13:23 +0800 Subject: [PATCH 13/14] Add mutex header --- cpp/src/arrow/scalar.h | 1 + 1 file changed, 1 insertion(+) diff --git a/cpp/src/arrow/scalar.h b/cpp/src/arrow/scalar.h index 339d8b4a5e9..e40691b5f17 100644 --- a/cpp/src/arrow/scalar.h +++ b/cpp/src/arrow/scalar.h @@ -22,6 +22,7 @@ #include #include +#include #include #include #include From df8f91abf9a9b2b6c45f23d26789253a85cfc332 Mon Sep 17 00:00:00 2001 From: Ruoxi Sun Date: Tue, 27 Feb 2024 19:47:56 +0800 Subject: [PATCH 14/14] Fix dense union buffer 2 --- cpp/src/arrow/array/data.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cpp/src/arrow/array/data.cc b/cpp/src/arrow/array/data.cc index 3048431b9eb..89fc9382e3c 100644 --- a/cpp/src/arrow/array/data.cc +++ b/cpp/src/arrow/array/data.cc @@ -325,7 +325,8 @@ std::pair TypeCodeAndOffsetsForDenseUnionScalar( static_assert(sizeof(UnionScratchSpace) <= sizeof(internal::ArraySpanFillFromScalarScratchSpace::scratch_space_)); return {BufferSpan{reinterpret_cast(&union_scratch_space->type_code), 1}, - BufferSpan{reinterpret_cast(&union_scratch_space->offsets), 1}}; + BufferSpan{reinterpret_cast(&union_scratch_space->offsets), + sizeof(int32_t)}}; } BufferSpan TypeCodeAndOffsetsForSparseUnionScalar(