Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 70 additions & 40 deletions cpp/src/arrow/array/data.cc
Original file line number Diff line number Diff line change
Expand Up @@ -284,24 +284,70 @@ void ArraySpan::SetMembers(const ArrayData& data) {
namespace {

template <typename offset_type>
BufferSpan OffsetsForScalar(uint8_t* scratch_space, offset_type value_size) {
auto* offsets = reinterpret_cast<offset_type*>(scratch_space);
offsets[0] = 0;
offsets[1] = static_cast<offset_type>(value_size);
BufferSpan OffsetsForScalar(const internal::ArraySpanFillFromScalarScratchSpace& scalar,
offset_type value_size) {
scalar.FillScratchSpace([&](uint8_t* scratch_space) {
auto* offsets = reinterpret_cast<offset_type*>(scratch_space);
offsets[0] = 0;
offsets[1] = static_cast<offset_type>(value_size);
});
static_assert(2 * sizeof(offset_type) <= 16);
return {scratch_space, sizeof(offset_type) * 2};
return {scalar.scratch_space_, sizeof(offset_type) * 2};
}

template <typename offset_type>
std::pair<BufferSpan, BufferSpan> OffsetsAndSizesForScalar(uint8_t* scratch_space,
offset_type value_size) {
auto* offsets = scratch_space;
auto* sizes = scratch_space + sizeof(offset_type);
reinterpret_cast<offset_type*>(offsets)[0] = 0;
reinterpret_cast<offset_type*>(sizes)[0] = value_size;
std::pair<BufferSpan, BufferSpan> OffsetsAndSizesForScalar(
const internal::ArraySpanFillFromScalarScratchSpace& scalar, offset_type value_size) {
scalar.FillScratchSpace([&](uint8_t* scratch_space) {
auto* offsets = scratch_space;
auto* sizes = scratch_space + sizeof(offset_type);
reinterpret_cast<offset_type*>(offsets)[0] = 0;
reinterpret_cast<offset_type*>(sizes)[0] = value_size;
});
static_assert(2 * sizeof(offset_type) <= 16);
return {BufferSpan{offsets, sizeof(offset_type)},
BufferSpan{sizes, sizeof(offset_type)}};
return {BufferSpan{scalar.scratch_space_, sizeof(offset_type)},
BufferSpan{scalar.scratch_space_ + sizeof(offset_type), sizeof(offset_type)}};
}

std::pair<BufferSpan, BufferSpan> TypeCodeAndOffsetsForDenseUnionScalar(
const internal::ArraySpanFillFromScalarScratchSpace& scalar, int8_t type_code) {
// Dense union needs scratch space to store both offsets and a type code
struct UnionScratchSpace {
alignas(int64_t) int8_t type_code;
alignas(int64_t) uint8_t offsets[sizeof(int32_t) * 2];
};
auto* union_scratch_space = reinterpret_cast<UnionScratchSpace*>(scalar.scratch_space_);
scalar.FillScratchSpace([&](uint8_t*) {
union_scratch_space->type_code = type_code;
reinterpret_cast<int32_t*>(union_scratch_space->offsets)[0] = 0;
reinterpret_cast<int32_t*>(union_scratch_space->offsets)[1] = 1;
});
static_assert(sizeof(UnionScratchSpace) <=
sizeof(internal::ArraySpanFillFromScalarScratchSpace::scratch_space_));
return {BufferSpan{reinterpret_cast<uint8_t*>(&union_scratch_space->type_code), 1},
BufferSpan{reinterpret_cast<uint8_t*>(&union_scratch_space->offsets),
sizeof(int32_t)}};
}

BufferSpan TypeCodeAndOffsetsForSparseUnionScalar(
const internal::ArraySpanFillFromScalarScratchSpace& scalar, int8_t type_code) {
scalar.FillScratchSpace([&](uint8_t* scratch_space) {
reinterpret_cast<int8_t*>(scratch_space)[0] = type_code;
});
static_assert(sizeof(int8_t) <=
sizeof(internal::ArraySpanFillFromScalarScratchSpace::scratch_space_));
return {scalar.scratch_space_, 1};
}

template <typename run_end_type>
BufferSpan RunEndForRunEndEncodedScalar(
const internal::ArraySpanFillFromScalarScratchSpace& scalar, run_end_type run_end) {
scalar.FillScratchSpace([&](uint8_t* scratch_space) {
reinterpret_cast<run_end_type*>(scratch_space)[0] = run_end;
});
static_assert(sizeof(run_end_type) <=
sizeof(internal::ArraySpanFillFromScalarScratchSpace::scratch_space_));
return {scalar.scratch_space_, sizeof(run_end_type)};
}

int GetNumBuffers(const DataType& type) {
Expand Down Expand Up @@ -415,11 +461,10 @@ void ArraySpan::FillFromScalar(const Scalar& value) {
data_size = scalar.value->size();
}
if (is_binary_like(type_id)) {
this->buffers[1] =
OffsetsForScalar(scalar.scratch_space_, static_cast<int32_t>(data_size));
this->buffers[1] = OffsetsForScalar(scalar, static_cast<int32_t>(data_size));
} else {
// is_large_binary_like
this->buffers[1] = OffsetsForScalar(scalar.scratch_space_, data_size);
this->buffers[1] = OffsetsForScalar(scalar, data_size);
}
this->buffers[2].data = const_cast<uint8_t*>(data_buffer);
this->buffers[2].size = data_size;
Expand Down Expand Up @@ -457,16 +502,15 @@ void ArraySpan::FillFromScalar(const Scalar& value) {
}

if (type_id == Type::LIST || type_id == Type::MAP) {
this->buffers[1] =
OffsetsForScalar(scalar.scratch_space_, static_cast<int32_t>(value_length));
this->buffers[1] = OffsetsForScalar(scalar, static_cast<int32_t>(value_length));
} else if (type_id == Type::LARGE_LIST) {
this->buffers[1] = OffsetsForScalar(scalar.scratch_space_, value_length);
this->buffers[1] = OffsetsForScalar(scalar, value_length);
} else if (type_id == Type::LIST_VIEW) {
std::tie(this->buffers[1], this->buffers[2]) = OffsetsAndSizesForScalar(
scalar.scratch_space_, static_cast<int32_t>(value_length));
std::tie(this->buffers[1], this->buffers[2]) =
OffsetsAndSizesForScalar(scalar, static_cast<int32_t>(value_length));
} else if (type_id == Type::LARGE_LIST_VIEW) {
std::tie(this->buffers[1], this->buffers[2]) =
OffsetsAndSizesForScalar(scalar.scratch_space_, value_length);
OffsetsAndSizesForScalar(scalar, value_length);
} else {
DCHECK_EQ(type_id, Type::FIXED_SIZE_LIST);
// FIXED_SIZE_LIST: does not have a second buffer
Expand All @@ -480,27 +524,14 @@ void ArraySpan::FillFromScalar(const Scalar& value) {
this->child_data[i].FillFromScalar(*scalar.value[i]);
}
} else if (is_union(type_id)) {
// Dense union needs scratch space to store both offsets and a type code
struct UnionScratchSpace {
alignas(int64_t) int8_t type_code;
alignas(int64_t) uint8_t offsets[sizeof(int32_t) * 2];
};
static_assert(sizeof(UnionScratchSpace) <= sizeof(UnionScalar::scratch_space_));
auto* union_scratch_space = reinterpret_cast<UnionScratchSpace*>(
&checked_cast<const UnionScalar&>(value).scratch_space_);

// First buffer is kept null since unions have no validity vector
this->buffers[0] = {};

union_scratch_space->type_code = checked_cast<const UnionScalar&>(value).type_code;
this->buffers[1].data = reinterpret_cast<uint8_t*>(&union_scratch_space->type_code);
this->buffers[1].size = 1;

this->child_data.resize(this->type->num_fields());
if (type_id == Type::DENSE_UNION) {
const auto& scalar = checked_cast<const DenseUnionScalar&>(value);
this->buffers[2] =
OffsetsForScalar(union_scratch_space->offsets, static_cast<int32_t>(1));
std::tie(this->buffers[1], this->buffers[2]) =
TypeCodeAndOffsetsForDenseUnionScalar(scalar, scalar.type_code);
// We can't "see" the other arrays in the union, but we put the "active"
// union array in the right place and fill zero-length arrays for the
// others
Expand All @@ -517,6 +548,7 @@ void ArraySpan::FillFromScalar(const Scalar& value) {
}
} else {
const auto& scalar = checked_cast<const SparseUnionScalar&>(value);
this->buffers[1] = TypeCodeAndOffsetsForSparseUnionScalar(scalar, scalar.type_code);
// Sparse union scalars have a full complement of child values even
// though only one of them is relevant, so we just fill them in here
for (int i = 0; i < static_cast<int>(this->child_data.size()); ++i) {
Expand All @@ -539,9 +571,7 @@ void ArraySpan::FillFromScalar(const Scalar& value) {
e.type = scalar.run_end_type().get();
e.length = 1;
e.null_count = 0;
e.buffers[1].data = scalar.scratch_space_;
e.buffers[1].size = sizeof(run_end);
reinterpret_cast<decltype(run_end)*>(scalar.scratch_space_)[0] = run_end;
e.buffers[1] = RunEndForRunEndEncodedScalar(scalar, run_end);
};

switch (scalar.run_end_type()->id()) {
Expand Down
12 changes: 12 additions & 0 deletions cpp/src/arrow/scalar.cc
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,18 @@ Status Scalar::ValidateFull() const {
return ScalarValidateImpl(/*full_validation=*/true).Validate(*this);
}

namespace internal {
void ArraySpanFillFromScalarScratchSpace::FillScratchSpace(FillScratchSpaceFn fn) const {
if (!scratch_space_fill_.filled_.load(std::memory_order_acquire)) {
std::lock_guard<std::mutex> lock(scratch_space_fill_.mutex_);
if (!scratch_space_fill_.filled_.load(std::memory_order_relaxed)) {
fn(scratch_space_);
scratch_space_fill_.filled_.store(true, std::memory_order_release);
}
}
}
} // namespace internal

BaseBinaryScalar::BaseBinaryScalar(std::string s, std::shared_ptr<DataType> type)
: BaseBinaryScalar(Buffer::FromString(std::move(s)), std::move(type)) {}

Expand Down
22 changes: 22 additions & 0 deletions cpp/src/arrow/scalar.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

#include <iosfwd>
#include <memory>
#include <mutex>
#include <ratio>
#include <string>
#include <string_view>
Expand Down Expand Up @@ -136,6 +137,27 @@ struct ARROW_EXPORT ArraySpanFillFromScalarScratchSpace {
// Scalar- including binary scalars where we need to create a buffer
// that looks like two 32-bit or 64-bit offsets.
alignas(int64_t) mutable uint8_t scratch_space_[sizeof(int64_t) * 2];

using FillScratchSpaceFn = std::function<void(uint8_t* scratch_space)>;
// Race-free filling of the scratch space.
void FillScratchSpace(FillScratchSpaceFn fn) const;

private:
// Structure using DCLP to assists the avoidance of the race conditions in
// reading/writing the scratch space.
struct ScratchSpaceFill {
std::atomic<bool> filled_{false};
std::mutex mutex_;

// Boilerplate code to make this structure default-constructible/copyable/movable, so
// that it doesn't affect the default-constructibility/copyability/movability of the
// containing class.
ScratchSpaceFill() = default;
ScratchSpaceFill(const ScratchSpaceFill&) {}
ScratchSpaceFill(ScratchSpaceFill&& other) {}
ScratchSpaceFill& operator=(const ScratchSpaceFill&) { return *this; }
ScratchSpaceFill& operator=(ScratchSpaceFill&&) { return *this; }
} mutable scratch_space_fill_;
};

struct ARROW_EXPORT PrimitiveScalarBase : public Scalar {
Expand Down
31 changes: 31 additions & 0 deletions cpp/src/arrow/scalar_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

#include <chrono>
#include <future>
#include <limits>
#include <memory>
#include <ostream>
Expand Down Expand Up @@ -1984,4 +1985,34 @@ TEST_F(TestExtensionScalar, ValidateErrors) {
AssertValidationFails(scalar);
}

TEST(TestScalarFillArraySpan, ParallelFill) {
for (auto scalar : {
ScalarFromJSON(utf8(), R"("test data")"),
// ScalarFromJSON(utf8_view(), R"("test data")"),
// ScalarFromJSON(list(int8()), "[1, 2, 3]"),
// ScalarFromJSON(list_view(int8()), "[1, 2, 3]"),
// TODO: more coming.
}) {
ARROW_SCOPED_TRACE("Scalar: ", scalar->ToString());

// Lambda to fill an ArraySpan with the scalar (and consequently fill the scratch
// space of the scalar), and use the ArraySpan a bit.
auto array_span_from_scalar = [&scalar]() {
ArraySpan span;
span.FillFromScalar(*scalar);
ASSERT_TRUE(span.type->Equals(scalar->type));
ASSERT_EQ(span.length, 1);
auto values = span.GetValues<int32_t>(1);
ASSERT_EQ(values[0], 0);
};

// Two concurrent calls to the lambda are just enough for TSAN to report a race
// condition.
auto fut1 = std::async(std::launch::async, array_span_from_scalar);
auto fut2 = std::async(std::launch::async, array_span_from_scalar);
fut1.get();
fut2.get();
}
}

} // namespace arrow