Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions c_glib/arrow-glib/scalar.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1063,7 +1063,8 @@ garrow_base_binary_scalar_get_value(GArrowBaseBinaryScalar *scalar)
if (!priv->value) {
const auto arrow_scalar = std::static_pointer_cast<arrow::BaseBinaryScalar>(
garrow_scalar_get_raw(GARROW_SCALAR(scalar)));
priv->value = garrow_buffer_new_raw(&(arrow_scalar->value));
priv->value = garrow_buffer_new_raw(
const_cast<std::shared_ptr<arrow::Buffer> *>(&(arrow_scalar->value)));
}
return priv->value;
}
Expand Down Expand Up @@ -1983,7 +1984,8 @@ garrow_base_list_scalar_get_value(GArrowBaseListScalar *scalar)
if (!priv->value) {
const auto arrow_scalar = std::static_pointer_cast<arrow::BaseListScalar>(
garrow_scalar_get_raw(GARROW_SCALAR(scalar)));
priv->value = garrow_array_new_raw(&(arrow_scalar->value));
priv->value = garrow_array_new_raw(
const_cast<std::shared_ptr<arrow::Array> *>(&(arrow_scalar->value)));
}
return priv->value;
}
Expand Down
36 changes: 36 additions & 0 deletions cpp/src/arrow/array/array_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <cmath>
#include <cstdint>
#include <cstring>
#include <future>
#include <limits>
#include <memory>
#include <numeric>
Expand Down Expand Up @@ -823,6 +824,41 @@ TEST_F(TestArray, TestFillFromScalar) {
}
}

// GH-40069: Data-race when concurrent calling ArraySpan::FillFromScalar of the same
// scalar instance.
TEST_F(TestArray, TestConcurrentFillFromScalar) {
for (auto type : TestArrayUtilitiesAgainstTheseTypes()) {
ARROW_SCOPED_TRACE("type = ", type->ToString());
for (auto seed : {0u, 0xdeadbeef, 42u}) {
ARROW_SCOPED_TRACE("seed = ", seed);

Field field("", type, /*nullable=*/true,
key_value_metadata({{"extension_allow_random_storage", "true"}}));
auto array = random::GenerateArray(field, 1, seed);

ASSERT_OK_AND_ASSIGN(auto scalar, array->GetScalar(0));

// Lambda to create fill an ArraySpan with the scalar and use the ArraySpan a bit.
auto array_span_from_scalar = [&]() {
ArraySpan span(*scalar);
auto roundtripped_array = span.ToArray();
ASSERT_OK(roundtripped_array->ValidateFull());

AssertArraysEqual(*array, *roundtripped_array);
ASSERT_OK_AND_ASSIGN(auto roundtripped_scalar, roundtripped_array->GetScalar(0));
AssertScalarsEqual(*scalar, *roundtripped_scalar);
};

// Two concurrent calls to the lambda are just enough for TSAN to detect a race
// condition.
auto fut1 = std::async(std::launch::async, array_span_from_scalar);
auto fut2 = std::async(std::launch::async, array_span_from_scalar);
fut1.get();
fut2.get();
}
}
}

TEST_F(TestArray, ExtensionSpanRoundTrip) {
// Other types are checked in MakeEmptyArray but MakeEmptyArray doesn't
// work for extension types so we check that here
Expand Down
90 changes: 40 additions & 50 deletions cpp/src/arrow/array/data.cc
Original file line number Diff line number Diff line change
Expand Up @@ -283,25 +283,15 @@ void ArraySpan::SetMembers(const ArrayData& data) {

namespace {

template <typename offset_type>
BufferSpan OffsetsForScalar(uint8_t* scratch_space, offset_type value_size) {
auto* offsets = reinterpret_cast<offset_type*>(scratch_space);
offsets[0] = 0;
offsets[1] = static_cast<offset_type>(value_size);
static_assert(2 * sizeof(offset_type) <= 16);
return {scratch_space, sizeof(offset_type) * 2};
BufferSpan OffsetsForScalar(uint8_t* scratch_space, int64_t offset_width) {
return {scratch_space, offset_width * 2};
}

template <typename offset_type>
std::pair<BufferSpan, BufferSpan> OffsetsAndSizesForScalar(uint8_t* scratch_space,
offset_type value_size) {
int64_t offset_width) {
auto* offsets = scratch_space;
auto* sizes = scratch_space + sizeof(offset_type);
reinterpret_cast<offset_type*>(offsets)[0] = 0;
reinterpret_cast<offset_type*>(sizes)[0] = value_size;
static_assert(2 * sizeof(offset_type) <= 16);
return {BufferSpan{offsets, sizeof(offset_type)},
BufferSpan{sizes, sizeof(offset_type)}};
auto* sizes = scratch_space + offset_width;
return {BufferSpan{offsets, offset_width}, BufferSpan{sizes, offset_width}};
}

int GetNumBuffers(const DataType& type) {
Expand Down Expand Up @@ -415,26 +405,23 @@ void ArraySpan::FillFromScalar(const Scalar& value) {
data_size = scalar.value->size();
}
if (is_binary_like(type_id)) {
this->buffers[1] =
OffsetsForScalar(scalar.scratch_space_, static_cast<int32_t>(data_size));
const auto& binary_scalar = checked_cast<const BinaryScalar&>(value);
this->buffers[1] = OffsetsForScalar(binary_scalar.scratch_space_, sizeof(int32_t));
} else {
// is_large_binary_like
this->buffers[1] = OffsetsForScalar(scalar.scratch_space_, data_size);
const auto& large_binary_scalar = checked_cast<const LargeBinaryScalar&>(value);
this->buffers[1] =
OffsetsForScalar(large_binary_scalar.scratch_space_, sizeof(int64_t));
}
this->buffers[2].data = const_cast<uint8_t*>(data_buffer);
this->buffers[2].size = data_size;
} else if (type_id == Type::BINARY_VIEW || type_id == Type::STRING_VIEW) {
const auto& scalar = checked_cast<const BaseBinaryScalar&>(value);
const auto& scalar = checked_cast<const BinaryViewScalar&>(value);

this->buffers[1].size = BinaryViewType::kSize;
this->buffers[1].data = scalar.scratch_space_;
static_assert(sizeof(BinaryViewType::c_type) <= sizeof(scalar.scratch_space_));
auto* view = new (&scalar.scratch_space_) BinaryViewType::c_type;
if (scalar.is_valid) {
*view = util::ToBinaryView(std::string_view{*scalar.value}, 0, 0);
this->buffers[2] = internal::PackVariadicBuffers({&scalar.value, 1});
} else {
*view = {};
}
} else if (type_id == Type::FIXED_SIZE_BINARY) {
const auto& scalar = checked_cast<const BaseBinaryScalar&>(value);
Expand All @@ -443,30 +430,36 @@ void ArraySpan::FillFromScalar(const Scalar& value) {
} else if (is_var_length_list_like(type_id) || type_id == Type::FIXED_SIZE_LIST) {
const auto& scalar = checked_cast<const BaseListScalar&>(value);

int64_t value_length = 0;
this->child_data.resize(1);
if (scalar.value != nullptr) {
// When the scalar is null, scalar.value can also be null
this->child_data[0].SetMembers(*scalar.value->data());
value_length = scalar.value->length();
} else {
// Even when the value is null, we still must populate the
// child_data to yield a valid array. Tedious
internal::FillZeroLengthArray(this->type->field(0)->type().get(),
&this->child_data[0]);
}

if (type_id == Type::LIST || type_id == Type::MAP) {
this->buffers[1] =
OffsetsForScalar(scalar.scratch_space_, static_cast<int32_t>(value_length));
if (type_id == Type::LIST) {
const auto& list_scalar = checked_cast<const ListScalar&>(value);
this->buffers[1] = OffsetsForScalar(list_scalar.scratch_space_, sizeof(int32_t));
} else if (type_id == Type::MAP) {
const auto& map_scalar = checked_cast<const MapScalar&>(value);
this->buffers[1] = OffsetsForScalar(map_scalar.scratch_space_, sizeof(int32_t));
} else if (type_id == Type::LARGE_LIST) {
this->buffers[1] = OffsetsForScalar(scalar.scratch_space_, value_length);
const auto& large_list_scalar = checked_cast<const LargeListScalar&>(value);
this->buffers[1] =
OffsetsForScalar(large_list_scalar.scratch_space_, sizeof(int64_t));
} else if (type_id == Type::LIST_VIEW) {
std::tie(this->buffers[1], this->buffers[2]) = OffsetsAndSizesForScalar(
scalar.scratch_space_, static_cast<int32_t>(value_length));
} else if (type_id == Type::LARGE_LIST_VIEW) {
const auto& list_view_scalar = checked_cast<const ListViewScalar&>(value);
std::tie(this->buffers[1], this->buffers[2]) =
OffsetsAndSizesForScalar(scalar.scratch_space_, value_length);
OffsetsAndSizesForScalar(list_view_scalar.scratch_space_, sizeof(int32_t));
} else if (type_id == Type::LARGE_LIST_VIEW) {
const auto& large_list_view_scalar =
checked_cast<const LargeListViewScalar&>(value);
std::tie(this->buffers[1], this->buffers[2]) = OffsetsAndSizesForScalar(
large_list_view_scalar.scratch_space_, sizeof(int64_t));
} else {
DCHECK_EQ(type_id, Type::FIXED_SIZE_LIST);
// FIXED_SIZE_LIST: does not have a second buffer
Expand All @@ -480,27 +473,19 @@ void ArraySpan::FillFromScalar(const Scalar& value) {
this->child_data[i].FillFromScalar(*scalar.value[i]);
}
} else if (is_union(type_id)) {
// Dense union needs scratch space to store both offsets and a type code
struct UnionScratchSpace {
alignas(int64_t) int8_t type_code;
alignas(int64_t) uint8_t offsets[sizeof(int32_t) * 2];
};
static_assert(sizeof(UnionScratchSpace) <= sizeof(UnionScalar::scratch_space_));
auto* union_scratch_space = reinterpret_cast<UnionScratchSpace*>(
&checked_cast<const UnionScalar&>(value).scratch_space_);

// First buffer is kept null since unions have no validity vector
this->buffers[0] = {};

union_scratch_space->type_code = checked_cast<const UnionScalar&>(value).type_code;
this->buffers[1].data = reinterpret_cast<uint8_t*>(&union_scratch_space->type_code);
this->buffers[1].size = 1;

this->child_data.resize(this->type->num_fields());
if (type_id == Type::DENSE_UNION) {
const auto& scalar = checked_cast<const DenseUnionScalar&>(value);
this->buffers[2] =
OffsetsForScalar(union_scratch_space->offsets, static_cast<int32_t>(1));
auto* union_scratch_space =
reinterpret_cast<UnionScalar::UnionScratchSpace*>(&scalar.scratch_space_);

this->buffers[1].data = reinterpret_cast<uint8_t*>(&union_scratch_space->type_code);
this->buffers[1].size = 1;

this->buffers[2] = OffsetsForScalar(union_scratch_space->offsets, sizeof(int32_t));
// We can't "see" the other arrays in the union, but we put the "active"
// union array in the right place and fill zero-length arrays for the
// others
Expand All @@ -517,6 +502,12 @@ void ArraySpan::FillFromScalar(const Scalar& value) {
}
} else {
const auto& scalar = checked_cast<const SparseUnionScalar&>(value);
auto* union_scratch_space =
reinterpret_cast<UnionScalar::UnionScratchSpace*>(&scalar.scratch_space_);

this->buffers[1].data = reinterpret_cast<uint8_t*>(&union_scratch_space->type_code);
this->buffers[1].size = 1;

// Sparse union scalars have a full complement of child values even
// though only one of them is relevant, so we just fill them in here
for (int i = 0; i < static_cast<int>(this->child_data.size()); ++i) {
Expand All @@ -541,7 +532,6 @@ void ArraySpan::FillFromScalar(const Scalar& value) {
e.null_count = 0;
e.buffers[1].data = scalar.scratch_space_;
e.buffers[1].size = sizeof(run_end);
reinterpret_cast<decltype(run_end)*>(scalar.scratch_space_)[0] = run_end;
};

switch (scalar.run_end_type()->id()) {
Expand Down
37 changes: 0 additions & 37 deletions cpp/src/arrow/compute/kernels/codegen_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -369,43 +369,6 @@ struct UnboxScalar<Decimal256Type> {
}
};

template <typename Type, typename Enable = void>
struct BoxScalar;

template <typename Type>
struct BoxScalar<Type, enable_if_has_c_type<Type>> {
using T = typename GetOutputType<Type>::T;
static void Box(T val, Scalar* out) {
// Enables BoxScalar<Int64Type> to work on a (for example) Time64Scalar
T* mutable_data = reinterpret_cast<T*>(
checked_cast<::arrow::internal::PrimitiveScalarBase*>(out)->mutable_data());
*mutable_data = val;
}
};

template <typename Type>
struct BoxScalar<Type, enable_if_base_binary<Type>> {
using T = typename GetOutputType<Type>::T;
using ScalarType = typename TypeTraits<Type>::ScalarType;
static void Box(T val, Scalar* out) {
checked_cast<ScalarType*>(out)->value = std::make_shared<Buffer>(val);
}
};

template <>
struct BoxScalar<Decimal128Type> {
using T = Decimal128;
using ScalarType = Decimal128Scalar;
static void Box(T val, Scalar* out) { checked_cast<ScalarType*>(out)->value = val; }
};

template <>
struct BoxScalar<Decimal256Type> {
using T = Decimal256;
using ScalarType = Decimal256Scalar;
static void Box(T val, Scalar* out) { checked_cast<ScalarType*>(out)->value = val; }
};

// A VisitArraySpanInline variant that calls its visitor function with logical
// values, such as Decimal128 rather than std::string_view.

Expand Down
17 changes: 9 additions & 8 deletions cpp/src/arrow/compute/kernels/scalar_compare.cc
Original file line number Diff line number Diff line change
Expand Up @@ -491,8 +491,9 @@ template <typename OutType, typename Op>
struct ScalarMinMax {
using OutValue = typename GetOutputType<OutType>::T;

static void ExecScalar(const ExecSpan& batch,
const ElementWiseAggregateOptions& options, Scalar* out) {
static Result<std::shared_ptr<Scalar>> ExecScalar(
const ExecSpan& batch, const ElementWiseAggregateOptions& options,
std::shared_ptr<DataType> type) {
// All arguments are scalar
OutValue value{};
bool valid = false;
Expand All @@ -502,8 +503,8 @@ struct ScalarMinMax {
const Scalar& scalar = *arg.scalar;
if (!scalar.is_valid) {
if (options.skip_nulls) continue;
out->is_valid = false;
return;
valid = false;
break;
}
if (!valid) {
value = UnboxScalar<OutType>::Unbox(scalar);
Expand All @@ -513,9 +514,10 @@ struct ScalarMinMax {
value, UnboxScalar<OutType>::Unbox(scalar));
}
}
out->is_valid = valid;
if (valid) {
BoxScalar<OutType>::Box(value, out);
return MakeScalar(std::move(type), std::move(value));
} else {
return MakeNullScalar(std::move(type));
}
}

Expand All @@ -537,8 +539,7 @@ struct ScalarMinMax {
bool initialize_output = true;
if (scalar_count > 0) {
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Scalar> temp_scalar,
MakeScalar(out->type()->GetSharedPtr(), 0));
ExecScalar(batch, options, temp_scalar.get());
ExecScalar(batch, options, out->type()->GetSharedPtr()));
if (temp_scalar->is_valid) {
const auto value = UnboxScalar<OutType>::Unbox(*temp_scalar);
initialize_output = false;
Expand Down
Loading