Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/src/arrow/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,7 @@ set(ARROW_UTIL_SRCS
util/decimal.cc
util/delimiting.cc
util/dict_util.cc
util/fixed_width_internal.cc
util/float16.cc
util/formatting.cc
util/future.cc
Expand Down
30 changes: 17 additions & 13 deletions cpp/src/arrow/compute/kernels/vector_selection_filter_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#include "arrow/util/bit_run_reader.h"
#include "arrow/util/bit_util.h"
#include "arrow/util/bitmap_ops.h"
#include "arrow/util/fixed_width_internal.h"

namespace arrow {

Expand Down Expand Up @@ -158,9 +159,11 @@ class PrimitiveFilterImpl {
PrimitiveFilterImpl(const ArraySpan& values, const ArraySpan& filter,
FilterOptions::NullSelectionBehavior null_selection,
ArrayData* out_arr)
: byte_width_(values.type->byte_width()),
: byte_width_(util::FixedWidthInBytes(*values.type)),
values_is_valid_(values.buffers[0].data),
values_data_(values.buffers[1].data),
// No offset applied for boolean because it's a bitmap
values_data_(kIsBoolean ? values.buffers[1].data
: util::OffsetPointerOfFixedWidthValues(values)),
values_null_count_(values.null_count),
values_offset_(values.offset),
values_length_(values.length),
Expand All @@ -169,17 +172,13 @@ class PrimitiveFilterImpl {
if constexpr (kByteWidth >= 0 && !kIsBoolean) {
DCHECK_EQ(kByteWidth, byte_width_);
}
if constexpr (!kIsBoolean) {
// No offset applied for boolean because it's a bitmap
values_data_ += values.offset * byte_width();
}

DCHECK_EQ(out_arr->offset, 0);
if (out_arr->buffers[0] != nullptr) {
// May be unallocated if neither filter nor values contain nulls
out_is_valid_ = out_arr->buffers[0]->mutable_data();
}
out_data_ = out_arr->buffers[1]->mutable_data();
DCHECK_EQ(out_arr->offset, 0);
out_data_ = util::MutableFixedWidthValuesPointer(out_arr);
out_length_ = out_arr->length;
out_position_ = 0;
}
Expand Down Expand Up @@ -416,7 +415,7 @@ class PrimitiveFilterImpl {
out_position_ += length;
}

constexpr int32_t byte_width() const {
constexpr int64_t byte_width() const {
if constexpr (kByteWidth >= 0) {
return kByteWidth;
} else {
Expand All @@ -425,7 +424,7 @@ class PrimitiveFilterImpl {
}

private:
int32_t byte_width_;
int64_t byte_width_;
const uint8_t* values_is_valid_;
const uint8_t* values_data_;
int64_t values_null_count_;
Expand All @@ -439,6 +438,8 @@ class PrimitiveFilterImpl {
int64_t out_position_;
};

} // namespace

Status PrimitiveFilterExec(KernelContext* ctx, const ExecSpan& batch, ExecResult* out) {
const ArraySpan& values = batch[0].array;
const ArraySpan& filter = batch[1].array;
Expand Down Expand Up @@ -468,9 +469,10 @@ Status PrimitiveFilterExec(KernelContext* ctx, const ExecSpan& batch, ExecResult
// validity bitmap.
const bool allocate_validity = values.null_count != 0 || !filter_null_count_is_zero;

const int bit_width = values.type->bit_width();
RETURN_NOT_OK(PreallocatePrimitiveArrayData(ctx, output_length, bit_width,
allocate_validity, out_arr));
DCHECK(util::IsFixedWidthLike(values, /*force_null_count=*/false));
const int64_t bit_width = util::FixedWidthInBits(*values.type);
RETURN_NOT_OK(util::internal::PreallocateFixedWidthArrayData(
ctx, output_length, /*source=*/values, allocate_validity, out_arr));

switch (bit_width) {
case 1:
Expand Down Expand Up @@ -505,6 +507,8 @@ Status PrimitiveFilterExec(KernelContext* ctx, const ExecSpan& batch, ExecResult
return Status::OK();
}

namespace {

// ----------------------------------------------------------------------
// Optimized filter for base binary types (32-bit and 64-bit)

Expand Down
100 changes: 32 additions & 68 deletions cpp/src/arrow/compute/kernels/vector_selection_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include "arrow/util/bit_block_counter.h"
#include "arrow/util/bit_run_reader.h"
#include "arrow/util/bit_util.h"
#include "arrow/util/fixed_width_internal.h"
#include "arrow/util/int_util.h"
#include "arrow/util/logging.h"
#include "arrow/util/ree_util.h"
Expand Down Expand Up @@ -65,24 +66,6 @@ void RegisterSelectionFunction(const std::string& name, FunctionDoc doc,
DCHECK_OK(registry->AddFunction(std::move(func)));
}

Status PreallocatePrimitiveArrayData(KernelContext* ctx, int64_t length, int bit_width,
bool allocate_validity, ArrayData* out) {
// Preallocate memory
out->length = length;
out->buffers.resize(2);

if (allocate_validity) {
ARROW_ASSIGN_OR_RAISE(out->buffers[0], ctx->AllocateBitmap(length));
}
if (bit_width == 1) {
ARROW_ASSIGN_OR_RAISE(out->buffers[1], ctx->AllocateBitmap(length));
} else {
ARROW_ASSIGN_OR_RAISE(out->buffers[1],
ctx->Allocate(bit_util::BytesForBits(length * bit_width)));
}
return Status::OK();
}

namespace {

/// \brief Iterate over a REE filter, emitting ranges of a plain values array that
Expand Down Expand Up @@ -564,39 +547,6 @@ struct VarBinarySelectionImpl : public Selection<VarBinarySelectionImpl<Type>, T
}
};

struct FSBSelectionImpl : public Selection<FSBSelectionImpl, FixedSizeBinaryType> {
using Base = Selection<FSBSelectionImpl, FixedSizeBinaryType>;
LIFT_BASE_MEMBERS();

TypedBufferBuilder<uint8_t> data_builder;

FSBSelectionImpl(KernelContext* ctx, const ExecSpan& batch, int64_t output_length,
ExecResult* out)
: Base(ctx, batch, output_length, out), data_builder(ctx->memory_pool()) {}

template <typename Adapter>
Status GenerateOutput() {
FixedSizeBinaryArray typed_values(this->values.ToArrayData());
int32_t value_size = typed_values.byte_width();

RETURN_NOT_OK(data_builder.Reserve(value_size * output_length));
Adapter adapter(this);
return adapter.Generate(
[&](int64_t index) {
auto val = typed_values.GetView(index);
data_builder.UnsafeAppend(reinterpret_cast<const uint8_t*>(val.data()),
value_size);
return Status::OK();
},
[&]() {
data_builder.UnsafeAppend(value_size, static_cast<uint8_t>(0x00));
return Status::OK();
});
}

Status Finish() override { return data_builder.Finish(&out->buffers[1]); }
};

template <typename Type>
struct ListSelectionImpl : public Selection<ListSelectionImpl<Type>, Type> {
using offset_type = typename Type::offset_type;
Expand Down Expand Up @@ -909,6 +859,24 @@ Status LargeListFilterExec(KernelContext* ctx, const ExecSpan& batch, ExecResult
}

Status FSLFilterExec(KernelContext* ctx, const ExecSpan& batch, ExecResult* out) {
const ArraySpan& values = batch[0].array;

// If a FixedSizeList wraps a fixed-width type we can, in some cases, use
// PrimitiveFilterExec for a fixed-size list array.
if (util::IsFixedWidthLike(values,
/*force_null_count=*/true,
/*extra_predicate=*/[](auto& fixed_width_type) {
// DICTIONARY is fixed-width but not supported by
// PrimitiveFilterExec.
return fixed_width_type.id() != Type::DICTIONARY;
})) {
const auto byte_width = util::FixedWidthInBytes(*values.type);
// 0 is a valid byte width for FixedSizeList, but PrimitiveFilterExec
// might not handle it correctly.
if (byte_width > 0) {
return PrimitiveFilterExec(ctx, batch, out);
}
}
return FilterExec<FSLSelectionImpl>(ctx, batch, out);
}

Expand Down Expand Up @@ -942,23 +910,6 @@ Status LargeVarBinaryTakeExec(KernelContext* ctx, const ExecSpan& batch,
return TakeExec<VarBinarySelectionImpl<LargeBinaryType>>(ctx, batch, out);
}

Status FSBTakeExec(KernelContext* ctx, const ExecSpan& batch, ExecResult* out) {
const ArraySpan& values = batch[0].array;
const auto byte_width = values.type->byte_width();
// Use primitive Take implementation (presumably faster) for some byte widths
switch (byte_width) {
case 1:
case 2:
case 4:
case 8:
case 16:
case 32:
return PrimitiveTakeExec(ctx, batch, out);
default:
return TakeExec<FSBSelectionImpl>(ctx, batch, out);
}
}

Status ListTakeExec(KernelContext* ctx, const ExecSpan& batch, ExecResult* out) {
return TakeExec<ListSelectionImpl<ListType>>(ctx, batch, out);
}
Expand All @@ -968,6 +919,19 @@ Status LargeListTakeExec(KernelContext* ctx, const ExecSpan& batch, ExecResult*
}

Status FSLTakeExec(KernelContext* ctx, const ExecSpan& batch, ExecResult* out) {
const ArraySpan& values = batch[0].array;

// If a FixedSizeList wraps a fixed-width type we can, in some cases, use
// FixedWidthTakeExec for a fixed-size list array.
if (util::IsFixedWidthLike(values,
/*force_null_count=*/true,
/*extra_predicate=*/[](auto& fixed_width_type) {
// DICTIONARY is fixed-width but not supported by
// FixedWidthTakeExec.
return fixed_width_type.id() != Type::DICTIONARY;
})) {
return FixedWidthTakeExec(ctx, batch, out);
}
return TakeExec<FSLSelectionImpl>(ctx, batch, out);
}

Expand Down
10 changes: 2 additions & 8 deletions cpp/src/arrow/compute/kernels/vector_selection_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,6 @@ void RegisterSelectionFunction(const std::string& name, FunctionDoc doc,
const FunctionOptions* default_options,
FunctionRegistry* registry);

/// \brief Allocate an ArrayData for a primitive array with a given length and bit width
///
/// \param[in] bit_width 1 or a multiple of 8
Status PreallocatePrimitiveArrayData(KernelContext* ctx, int64_t length, int bit_width,
bool allocate_validity, ArrayData* out);

/// \brief Callback type for VisitPlainxREEFilterOutputSegments.
///
/// position is the logical position in the values array relative to its offset.
Expand All @@ -70,6 +64,7 @@ void VisitPlainxREEFilterOutputSegments(
FilterOptions::NullSelectionBehavior null_selection,
const EmitREEFilterSegment& emit_segment);

Status PrimitiveFilterExec(KernelContext*, const ExecSpan&, ExecResult*);
Status ListFilterExec(KernelContext*, const ExecSpan&, ExecResult*);
Status LargeListFilterExec(KernelContext*, const ExecSpan&, ExecResult*);
Status FSLFilterExec(KernelContext*, const ExecSpan&, ExecResult*);
Expand All @@ -78,8 +73,7 @@ Status MapFilterExec(KernelContext*, const ExecSpan&, ExecResult*);

Status VarBinaryTakeExec(KernelContext*, const ExecSpan&, ExecResult*);
Status LargeVarBinaryTakeExec(KernelContext*, const ExecSpan&, ExecResult*);
Status PrimitiveTakeExec(KernelContext*, const ExecSpan&, ExecResult*);
Status FSBTakeExec(KernelContext*, const ExecSpan&, ExecResult*);
Status FixedWidthTakeExec(KernelContext*, const ExecSpan&, ExecResult*);
Status ListTakeExec(KernelContext*, const ExecSpan&, ExecResult*);
Status LargeListTakeExec(KernelContext*, const ExecSpan&, ExecResult*);
Status FSLTakeExec(KernelContext*, const ExecSpan&, ExecResult*);
Expand Down
Loading