diff --git a/cpp/src/arrow/array/data.cc b/cpp/src/arrow/array/data.cc index 80c411dfa6a..89fc9382e3c 100644 --- a/cpp/src/arrow/array/data.cc +++ b/cpp/src/arrow/array/data.cc @@ -284,24 +284,70 @@ void ArraySpan::SetMembers(const ArrayData& data) { namespace { template -BufferSpan OffsetsForScalar(uint8_t* scratch_space, offset_type value_size) { - auto* offsets = reinterpret_cast(scratch_space); - offsets[0] = 0; - offsets[1] = static_cast(value_size); +BufferSpan OffsetsForScalar(const internal::ArraySpanFillFromScalarScratchSpace& scalar, + offset_type value_size) { + scalar.FillScratchSpace([&](uint8_t* scratch_space) { + auto* offsets = reinterpret_cast(scratch_space); + offsets[0] = 0; + offsets[1] = static_cast(value_size); + }); static_assert(2 * sizeof(offset_type) <= 16); - return {scratch_space, sizeof(offset_type) * 2}; + return {scalar.scratch_space_, sizeof(offset_type) * 2}; } template -std::pair OffsetsAndSizesForScalar(uint8_t* scratch_space, - offset_type value_size) { - auto* offsets = scratch_space; - auto* sizes = scratch_space + sizeof(offset_type); - reinterpret_cast(offsets)[0] = 0; - reinterpret_cast(sizes)[0] = value_size; +std::pair OffsetsAndSizesForScalar( + const internal::ArraySpanFillFromScalarScratchSpace& scalar, offset_type value_size) { + scalar.FillScratchSpace([&](uint8_t* scratch_space) { + auto* offsets = scratch_space; + auto* sizes = scratch_space + sizeof(offset_type); + reinterpret_cast(offsets)[0] = 0; + reinterpret_cast(sizes)[0] = value_size; + }); static_assert(2 * sizeof(offset_type) <= 16); - return {BufferSpan{offsets, sizeof(offset_type)}, - BufferSpan{sizes, sizeof(offset_type)}}; + return {BufferSpan{scalar.scratch_space_, sizeof(offset_type)}, + BufferSpan{scalar.scratch_space_ + sizeof(offset_type), sizeof(offset_type)}}; +} + +std::pair TypeCodeAndOffsetsForDenseUnionScalar( + const internal::ArraySpanFillFromScalarScratchSpace& scalar, int8_t type_code) { + // Dense union needs scratch space to store both offsets and a type code + struct UnionScratchSpace { + alignas(int64_t) int8_t type_code; + alignas(int64_t) uint8_t offsets[sizeof(int32_t) * 2]; + }; + auto* union_scratch_space = reinterpret_cast(scalar.scratch_space_); + scalar.FillScratchSpace([&](uint8_t*) { + union_scratch_space->type_code = type_code; + reinterpret_cast(union_scratch_space->offsets)[0] = 0; + reinterpret_cast(union_scratch_space->offsets)[1] = 1; + }); + static_assert(sizeof(UnionScratchSpace) <= + sizeof(internal::ArraySpanFillFromScalarScratchSpace::scratch_space_)); + return {BufferSpan{reinterpret_cast(&union_scratch_space->type_code), 1}, + BufferSpan{reinterpret_cast(&union_scratch_space->offsets), + sizeof(int32_t)}}; +} + +BufferSpan TypeCodeAndOffsetsForSparseUnionScalar( + const internal::ArraySpanFillFromScalarScratchSpace& scalar, int8_t type_code) { + scalar.FillScratchSpace([&](uint8_t* scratch_space) { + reinterpret_cast(scratch_space)[0] = type_code; + }); + static_assert(sizeof(int8_t) <= + sizeof(internal::ArraySpanFillFromScalarScratchSpace::scratch_space_)); + return {scalar.scratch_space_, 1}; +} + +template +BufferSpan RunEndForRunEndEncodedScalar( + const internal::ArraySpanFillFromScalarScratchSpace& scalar, run_end_type run_end) { + scalar.FillScratchSpace([&](uint8_t* scratch_space) { + reinterpret_cast(scratch_space)[0] = run_end; + }); + static_assert(sizeof(run_end_type) <= + sizeof(internal::ArraySpanFillFromScalarScratchSpace::scratch_space_)); + return {scalar.scratch_space_, sizeof(run_end_type)}; } int GetNumBuffers(const DataType& type) { @@ -415,11 +461,10 @@ void ArraySpan::FillFromScalar(const Scalar& value) { data_size = scalar.value->size(); } if (is_binary_like(type_id)) { - this->buffers[1] = - OffsetsForScalar(scalar.scratch_space_, static_cast(data_size)); + this->buffers[1] = OffsetsForScalar(scalar, static_cast(data_size)); } else { // is_large_binary_like - this->buffers[1] = OffsetsForScalar(scalar.scratch_space_, data_size); + this->buffers[1] = OffsetsForScalar(scalar, data_size); } this->buffers[2].data = const_cast(data_buffer); this->buffers[2].size = data_size; @@ -457,16 +502,15 @@ void ArraySpan::FillFromScalar(const Scalar& value) { } if (type_id == Type::LIST || type_id == Type::MAP) { - this->buffers[1] = - OffsetsForScalar(scalar.scratch_space_, static_cast(value_length)); + this->buffers[1] = OffsetsForScalar(scalar, static_cast(value_length)); } else if (type_id == Type::LARGE_LIST) { - this->buffers[1] = OffsetsForScalar(scalar.scratch_space_, value_length); + this->buffers[1] = OffsetsForScalar(scalar, value_length); } else if (type_id == Type::LIST_VIEW) { - std::tie(this->buffers[1], this->buffers[2]) = OffsetsAndSizesForScalar( - scalar.scratch_space_, static_cast(value_length)); + std::tie(this->buffers[1], this->buffers[2]) = + OffsetsAndSizesForScalar(scalar, static_cast(value_length)); } else if (type_id == Type::LARGE_LIST_VIEW) { std::tie(this->buffers[1], this->buffers[2]) = - OffsetsAndSizesForScalar(scalar.scratch_space_, value_length); + OffsetsAndSizesForScalar(scalar, value_length); } else { DCHECK_EQ(type_id, Type::FIXED_SIZE_LIST); // FIXED_SIZE_LIST: does not have a second buffer @@ -480,27 +524,14 @@ void ArraySpan::FillFromScalar(const Scalar& value) { this->child_data[i].FillFromScalar(*scalar.value[i]); } } else if (is_union(type_id)) { - // Dense union needs scratch space to store both offsets and a type code - struct UnionScratchSpace { - alignas(int64_t) int8_t type_code; - alignas(int64_t) uint8_t offsets[sizeof(int32_t) * 2]; - }; - static_assert(sizeof(UnionScratchSpace) <= sizeof(UnionScalar::scratch_space_)); - auto* union_scratch_space = reinterpret_cast( - &checked_cast(value).scratch_space_); - // First buffer is kept null since unions have no validity vector this->buffers[0] = {}; - union_scratch_space->type_code = checked_cast(value).type_code; - this->buffers[1].data = reinterpret_cast(&union_scratch_space->type_code); - this->buffers[1].size = 1; - this->child_data.resize(this->type->num_fields()); if (type_id == Type::DENSE_UNION) { const auto& scalar = checked_cast(value); - this->buffers[2] = - OffsetsForScalar(union_scratch_space->offsets, static_cast(1)); + std::tie(this->buffers[1], this->buffers[2]) = + TypeCodeAndOffsetsForDenseUnionScalar(scalar, scalar.type_code); // We can't "see" the other arrays in the union, but we put the "active" // union array in the right place and fill zero-length arrays for the // others @@ -517,6 +548,7 @@ void ArraySpan::FillFromScalar(const Scalar& value) { } } else { const auto& scalar = checked_cast(value); + this->buffers[1] = TypeCodeAndOffsetsForSparseUnionScalar(scalar, scalar.type_code); // Sparse union scalars have a full complement of child values even // though only one of them is relevant, so we just fill them in here for (int i = 0; i < static_cast(this->child_data.size()); ++i) { @@ -539,9 +571,7 @@ void ArraySpan::FillFromScalar(const Scalar& value) { e.type = scalar.run_end_type().get(); e.length = 1; e.null_count = 0; - e.buffers[1].data = scalar.scratch_space_; - e.buffers[1].size = sizeof(run_end); - reinterpret_cast(scalar.scratch_space_)[0] = run_end; + e.buffers[1] = RunEndForRunEndEncodedScalar(scalar, run_end); }; switch (scalar.run_end_type()->id()) { diff --git a/cpp/src/arrow/scalar.cc b/cpp/src/arrow/scalar.cc index 6996b46c8b6..e4766a30c1e 100644 --- a/cpp/src/arrow/scalar.cc +++ b/cpp/src/arrow/scalar.cc @@ -554,6 +554,18 @@ Status Scalar::ValidateFull() const { return ScalarValidateImpl(/*full_validation=*/true).Validate(*this); } +namespace internal { +void ArraySpanFillFromScalarScratchSpace::FillScratchSpace(FillScratchSpaceFn fn) const { + if (!scratch_space_fill_.filled_.load(std::memory_order_acquire)) { + std::lock_guard lock(scratch_space_fill_.mutex_); + if (!scratch_space_fill_.filled_.load(std::memory_order_relaxed)) { + fn(scratch_space_); + scratch_space_fill_.filled_.store(true, std::memory_order_release); + } + } +} +} // namespace internal + BaseBinaryScalar::BaseBinaryScalar(std::string s, std::shared_ptr type) : BaseBinaryScalar(Buffer::FromString(std::move(s)), std::move(type)) {} diff --git a/cpp/src/arrow/scalar.h b/cpp/src/arrow/scalar.h index 65c5ee4df0a..e40691b5f17 100644 --- a/cpp/src/arrow/scalar.h +++ b/cpp/src/arrow/scalar.h @@ -22,6 +22,7 @@ #include #include +#include #include #include #include @@ -136,6 +137,27 @@ struct ARROW_EXPORT ArraySpanFillFromScalarScratchSpace { // Scalar- including binary scalars where we need to create a buffer // that looks like two 32-bit or 64-bit offsets. alignas(int64_t) mutable uint8_t scratch_space_[sizeof(int64_t) * 2]; + + using FillScratchSpaceFn = std::function; + // Race-free filling of the scratch space. + void FillScratchSpace(FillScratchSpaceFn fn) const; + + private: + // Structure using DCLP to assists the avoidance of the race conditions in + // reading/writing the scratch space. + struct ScratchSpaceFill { + std::atomic filled_{false}; + std::mutex mutex_; + + // Boilerplate code to make this structure default-constructible/copyable/movable, so + // that it doesn't affect the default-constructibility/copyability/movability of the + // containing class. + ScratchSpaceFill() = default; + ScratchSpaceFill(const ScratchSpaceFill&) {} + ScratchSpaceFill(ScratchSpaceFill&& other) {} + ScratchSpaceFill& operator=(const ScratchSpaceFill&) { return *this; } + ScratchSpaceFill& operator=(ScratchSpaceFill&&) { return *this; } + } mutable scratch_space_fill_; }; struct ARROW_EXPORT PrimitiveScalarBase : public Scalar { diff --git a/cpp/src/arrow/scalar_test.cc b/cpp/src/arrow/scalar_test.cc index 09dfde32271..411bbce55c4 100644 --- a/cpp/src/arrow/scalar_test.cc +++ b/cpp/src/arrow/scalar_test.cc @@ -16,6 +16,7 @@ // under the License. #include +#include #include #include #include @@ -1984,4 +1985,34 @@ TEST_F(TestExtensionScalar, ValidateErrors) { AssertValidationFails(scalar); } +TEST(TestScalarFillArraySpan, ParallelFill) { + for (auto scalar : { + ScalarFromJSON(utf8(), R"("test data")"), + // ScalarFromJSON(utf8_view(), R"("test data")"), + // ScalarFromJSON(list(int8()), "[1, 2, 3]"), + // ScalarFromJSON(list_view(int8()), "[1, 2, 3]"), + // TODO: more coming. + }) { + ARROW_SCOPED_TRACE("Scalar: ", scalar->ToString()); + + // Lambda to fill an ArraySpan with the scalar (and consequently fill the scratch + // space of the scalar), and use the ArraySpan a bit. + auto array_span_from_scalar = [&scalar]() { + ArraySpan span; + span.FillFromScalar(*scalar); + ASSERT_TRUE(span.type->Equals(scalar->type)); + ASSERT_EQ(span.length, 1); + auto values = span.GetValues(1); + ASSERT_EQ(values[0], 0); + }; + + // Two concurrent calls to the lambda are just enough for TSAN to report a race + // condition. + auto fut1 = std::async(std::launch::async, array_span_from_scalar); + auto fut2 = std::async(std::launch::async, array_span_from_scalar); + fut1.get(); + fut2.get(); + } +} + } // namespace arrow