diff --git a/cpp/src/arrow/compute/api_vector.cc b/cpp/src/arrow/compute/api_vector.cc index d0d67e76fb2..d0dec5b00c6 100644 --- a/cpp/src/arrow/compute/api_vector.cc +++ b/cpp/src/arrow/compute/api_vector.cc @@ -22,7 +22,6 @@ #include #include "arrow/compute/exec.h" -#include "arrow/compute/kernels/vector_selection_internal.h" #include "arrow/datum.h" #include "arrow/record_batch.h" #include "arrow/result.h" diff --git a/cpp/src/arrow/compute/api_vector.h b/cpp/src/arrow/compute/api_vector.h index 84086802f18..8aefbcf1ccf 100644 --- a/cpp/src/arrow/compute/api_vector.h +++ b/cpp/src/arrow/compute/api_vector.h @@ -65,7 +65,12 @@ Result Filter(const Datum& values, const Datum& filter, ExecContext* ctx = NULLPTR); struct ARROW_EXPORT TakeOptions : public FunctionOptions { - static TakeOptions Defaults() { return TakeOptions{}; } + explicit TakeOptions(bool boundscheck = true) : boundscheck(boundscheck) {} + + bool boundscheck = true; + static TakeOptions Boundscheck() { return TakeOptions(true); } + static TakeOptions NoBoundscheck() { return TakeOptions(false); } + static TakeOptions Defaults() { return Boundscheck(); } }; /// \brief Take from an array of values at indices in another array diff --git a/cpp/src/arrow/compute/benchmark_util.h b/cpp/src/arrow/compute/benchmark_util.h index 77149f91aab..7731b98f0f1 100644 --- a/cpp/src/arrow/compute/benchmark_util.h +++ b/cpp/src/arrow/compute/benchmark_util.h @@ -56,9 +56,10 @@ void BenchmarkSetArgsWithSizes(benchmark::internal::Benchmark* bench, const std::vector& sizes = kMemorySizes) { bench->Unit(benchmark::kMicrosecond); + // 0 is treated as "no nulls" for (const auto size : sizes) { for (const auto inverse_null_proportion : - std::vector({10000, 100, 10, 2, 1})) { + std::vector({10000, 100, 50, 10, 2, 1, 0})) { bench->Args({static_cast(size), inverse_null_proportion}); } } @@ -80,12 +81,16 @@ struct RegressionArgs { const int64_t size; // proportion of nulls in generated arrays - const double null_proportion; + double null_proportion; explicit RegressionArgs(benchmark::State& state) - : size(state.range(0)), - null_proportion(std::min(1., 1. / static_cast(state.range(1)))), - state_(state) {} + : size(state.range(0)), null_proportion(), state_(state) { + if (state.range(1) == 0) { + this->null_proportion = 0.0; + } else { + this->null_proportion = std::min(1., 1. / static_cast(state.range(1))); + } + } ~RegressionArgs() { state_.counters["size"] = static_cast(size); diff --git a/cpp/src/arrow/compute/kernel.cc b/cpp/src/arrow/compute/kernel.cc index 1a6f1bd1031..4747e63c159 100644 --- a/cpp/src/arrow/compute/kernel.cc +++ b/cpp/src/arrow/compute/kernel.cc @@ -26,6 +26,7 @@ #include "arrow/compute/exec.h" #include "arrow/compute/util_internal.h" #include "arrow/result.h" +#include "arrow/type_traits.h" #include "arrow/util/bit_util.h" #include "arrow/util/checked_cast.h" #include "arrow/util/hash_util.h" @@ -96,7 +97,6 @@ class SameTypeIdMatcher : public TypeMatcher { if (this == &other) { return true; } - auto casted = dynamic_cast(&other); if (casted == nullptr) { return false; @@ -150,6 +150,86 @@ std::shared_ptr TimestampUnit(TimeUnit::type unit) { return std::make_shared(unit); } +class IntegerMatcher : public TypeMatcher { + public: + IntegerMatcher() {} + + bool Matches(const DataType& type) const override { return is_integer(type.id()); } + + bool Equals(const TypeMatcher& other) const override { + if (this == &other) { + return true; + } + auto casted = dynamic_cast(&other); + return casted != nullptr; + } + + std::string ToString() const override { return "integer"; } +}; + +std::shared_ptr Integer() { return std::make_shared(); } + +class PrimitiveMatcher : public TypeMatcher { + public: + PrimitiveMatcher() {} + + bool Matches(const DataType& type) const override { return is_primitive(type.id()); } + + bool Equals(const TypeMatcher& other) const override { + if (this == &other) { + return true; + } + auto casted = dynamic_cast(&other); + return casted != nullptr; + } + + std::string ToString() const override { return "primitive"; } +}; + +std::shared_ptr Primitive() { return std::make_shared(); } + +class BinaryLikeMatcher : public TypeMatcher { + public: + BinaryLikeMatcher() {} + + bool Matches(const DataType& type) const override { return is_binary_like(type.id()); } + + bool Equals(const TypeMatcher& other) const override { + if (this == &other) { + return true; + } + auto casted = dynamic_cast(&other); + return casted != nullptr; + } + std::string ToString() const override { return "binary-like"; } +}; + +std::shared_ptr BinaryLike() { + return std::make_shared(); +} + +class LargeBinaryLikeMatcher : public TypeMatcher { + public: + LargeBinaryLikeMatcher() {} + + bool Matches(const DataType& type) const override { + return is_large_binary_like(type.id()); + } + + bool Equals(const TypeMatcher& other) const override { + if (this == &other) { + return true; + } + auto casted = dynamic_cast(&other); + return casted != nullptr; + } + std::string ToString() const override { return "large-binary-like"; } +}; + +std::shared_ptr LargeBinaryLike() { + return std::make_shared(); +} + } // namespace match // ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/compute/kernel.h b/cpp/src/arrow/compute/kernel.h index bdd2e9d1079..059a7c2001a 100644 --- a/cpp/src/arrow/compute/kernel.h +++ b/cpp/src/arrow/compute/kernel.h @@ -150,6 +150,19 @@ ARROW_EXPORT std::shared_ptr SameTypeId(Type::type type_id); /// zones can be different. ARROW_EXPORT std::shared_ptr TimestampUnit(TimeUnit::type unit); +// \brief Match any integer type +ARROW_EXPORT std::shared_ptr Integer(); + +// Match types using 32-bit varbinary representation +ARROW_EXPORT std::shared_ptr BinaryLike(); + +// Match types using 64-bit varbinary representation +ARROW_EXPORT std::shared_ptr LargeBinaryLike(); + +// \brief Match any primitive type (boolean or any type representable as a C +// Type) +ARROW_EXPORT std::shared_ptr Primitive(); + } // namespace match /// \brief An object used for type- and shape-checking arguments to be passed diff --git a/cpp/src/arrow/compute/kernels/vector_filter.cc b/cpp/src/arrow/compute/kernels/vector_filter.cc index db21d402e35..f8e26c58c31 100644 --- a/cpp/src/arrow/compute/kernels/vector_filter.cc +++ b/cpp/src/arrow/compute/kernels/vector_filter.cc @@ -22,136 +22,670 @@ #include "arrow/compute/kernels/vector_selection_internal.h" #include "arrow/record_batch.h" #include "arrow/result.h" +#include "arrow/util/bit_block_counter.h" #include "arrow/visitor_inline.h" namespace arrow { + +using internal::BitBlockCount; +using internal::BitBlockCounter; +using internal::BinaryBitBlockCounter; + namespace compute { namespace internal { -// IndexSequence which yields the indices of positions in a BooleanArray -// which are either null or true -template -class FilterIndexSequence { - public: - // constexpr so we'll never instantiate bounds checking - constexpr bool never_out_of_bounds() const { return true; } - void set_never_out_of_bounds() {} +using FilterState = OptionsWrapper; - constexpr FilterIndexSequence() = default; +int64_t FilterOutputSize(FilterOptions::NullSelectionBehavior null_selection, + const Array& arr) { + const auto& filter = checked_cast(arr); + const uint8_t* data_bitmap = arr.data()->buffers[1]->data(); + const int64_t offset = arr.offset(); + int64_t size = 0; + if (filter.null_count() > 0) { + const uint8_t* valid_bitmap = arr.null_bitmap_data(); + if (null_selection == FilterOptions::EMIT_NULL) { + BitBlockCounter bit_counter(data_bitmap, offset, arr.length()); + int64_t position = 0; + while (true) { + BitBlockCount block = bit_counter.NextWord(); + if (block.popcount == block.length) { + // The whole block is included + size += block.length; + } else { + // If the filter is set or the filter is null, we include it in the + // result + for (int64_t i = 0; i < block.length; ++i) { + if (BitUtil::GetBit(data_bitmap, offset + position + i) || + !BitUtil::GetBit(valid_bitmap, offset + position + i)) { + ++size; + } + } + } + position += block.length; + } + } else { + // FilterOptions::DROP_NULL. Values must be valid and on/true, so we can + // use the binary block counter. + BinaryBitBlockCounter bit_counter(data_bitmap, offset, valid_bitmap, offset, + arr.length()); + while (true) { + BitBlockCount block = bit_counter.NextAndWord(); + if (block.length == 0) { + break; + } + size += block.popcount; + } + } + } else { + // The filter has no nulls, so we plow through it as fast as possible. + BitBlockCounter bit_counter(data_bitmap, offset, arr.length()); + while (true) { + BitBlockCount block = bit_counter.NextFourWords(); + if (block.length == 0) { + break; + } + size += block.popcount; + } + } + return size; +} - FilterIndexSequence(const BooleanArray& filter, int64_t out_length) - : filter_(&filter), out_length_(out_length) {} +static int GetBitWidth(const DataType& type) { + return checked_cast(type).bit_width(); +} - std::pair Next() { - if (NullSelectionBehavior == FilterOptions::DROP) { - // skip until an index is found at which the filter is true - while (filter_->IsNull(index_) || !filter_->Value(index_)) { - ++index_; - } - return std::make_pair(index_++, true); +// ---------------------------------------------------------------------- + +/// \brief The Filter implementation for primitive (fixed-width) types does not +/// use the logical Arrow type but rather then physical C type. This way we +/// only generate one take function for each byte width. +template +struct PrimitiveImpl { + + static void Exec(const ExecBatch& batch, Datum* out_datum) { + auto values = reinterpret_cast(args.values); + auto values_bitmap = args.values_bitmap; + auto values_offset = args.values_offset; + + auto indices = reinterpret_cast(args.indices); + auto indices_bitmap = args.indices_bitmap; + auto indices_offset = args.indices_offset; + + ArrayData* out_arr = out_datum->mutable_array(); + auto out = out_arr->GetMutableValues(1); + auto out_bitmap = out_arr->buffers[0]->mutable_data(); + auto out_offset = out_arr->offset; + + // If either the values or indices have nulls, we preemptively zero out the + // out validity bitmap so that we don't have to use ClearBit in each + // iteration for nulls. + if (args.values_null_count > 0 || args.indices_null_count > 0) { + BitUtil::SetBitsTo(out_bitmap, out_offset, args.indices_length, false); } - // skip until an index is found at which the filter is either null or true - while (filter_->IsValid(index_) && !filter_->Value(index_)) { - ++index_; + OptionalBitBlockCounter indices_bit_counter(indices_bitmap, indices_offset, + args.indices_length); + int64_t position = 0; + int64_t valid_count = 0; + while (true) { + BitBlockCount block = indices_bit_counter.NextBlock(); + if (block.length == 0) { + // All indices processed. + break; + } + if (args.values_null_count == 0) { + // Values are never null, so things are easier + valid_count += block.popcount; + if (block.popcount == block.length) { + // Fastest path: neither values nor index nulls + BitUtil::SetBitsTo(out_bitmap, out_offset + position, block.length, true); + for (int64_t i = 0; i < block.length; ++i) { + out[position] = values[indices[position]]; + ++position; + } + } else if (block.popcount > 0) { + // Slow path: some indices but not all are null + for (int64_t i = 0; i < block.length; ++i) { + if (BitUtil::GetBit(indices_bitmap, indices_offset + position)) { + // index is not null + BitUtil::SetBit(out_bitmap, out_offset + position); + out[position] = values[indices[position]]; + } + ++position; + } + } + } else { + // Values have nulls, so we must do random access into the values bitmap + if (block.popcount == block.length) { + // Faster path: indices are not null but values may be + for (int64_t i = 0; i < block.length; ++i) { + if (BitUtil::GetBit(values_bitmap, values_offset + indices[position])) { + // value is not null + out[position] = values[indices[position]]; + BitUtil::SetBit(out_bitmap, out_offset + position); + ++valid_count; + } + ++position; + } + } else if (block.popcount > 0) { + // Slow path: some but not all indices are null. Since we are doing + // random access in general we have to check the value nullness one by + // one. + for (int64_t i = 0; i < block.length; ++i) { + if (BitUtil::GetBit(indices_bitmap, indices_offset + position)) { + // index is not null + if (BitUtil::GetBit(values_bitmap, values_offset + indices[position])) { + // value is not null + out[position] = values[indices[position]]; + BitUtil::SetBit(out_bitmap, out_offset + position); + ++valid_count; + } + } + ++position; + } + } + } } - bool is_valid = filter_->IsValid(index_); - return std::make_pair(index_++, is_valid); + out_arr->null_count = out_arr->length - valid_count; } +}; - int64_t length() const { return out_length_; } +template +struct BooleanImpl { + static void Exec(const PrimitiveTakeArgs& args, Datum* out_datum) { + auto values = args.values; + auto values_bitmap = args.values_bitmap; + auto values_offset = args.values_offset; - int64_t null_count() const { - if (NullSelectionBehavior == FilterOptions::DROP) { - return 0; + auto indices = reinterpret_cast(args.indices); + auto indices_bitmap = args.indices_bitmap; + auto indices_offset = args.indices_offset; + + ArrayData* out_arr = out_datum->mutable_array(); + auto out = out_arr->buffers[1]->mutable_data(); + auto out_bitmap = out_arr->buffers[0]->mutable_data(); + auto out_offset = out_arr->offset; + + // If either the values or indices have nulls, we preemptively zero out the + // out validity bitmap so that we don't have to use ClearBit in each + // iteration for nulls. + if (args.values_null_count > 0 || args.indices_null_count > 0) { + BitUtil::SetBitsTo(out_bitmap, out_offset, args.indices_length, false); } - return filter_->null_count(); - } - private: - const BooleanArray* filter_ = nullptr; - int64_t index_ = 0, out_length_ = -1; -}; + auto PlaceDataBit = [&](int64_t loc, IndexCType index) { + BitUtil::SetBitTo(out, out_offset + loc, + BitUtil::GetBit(values, values_offset + index)); + }; -int64_t FilterOutputSize(FilterOptions::NullSelectionBehavior null_selection, - const Array& arr) { - const auto& filter = checked_cast(arr); - // TODO(bkietz) this can be optimized. Use Bitmap::VisitWords - int64_t size = 0; - if (null_selection == FilterOptions::EMIT_NULL) { - for (auto i = 0; i < filter.length(); ++i) { - if (filter.IsNull(i) || filter.Value(i)) { - ++size; + OptionalBitBlockCounter indices_bit_counter(indices_bitmap, indices_offset, + args.indices_length); + int64_t position = 0; + int64_t valid_count = 0; + while (true) { + BitBlockCount block = indices_bit_counter.NextBlock(); + if (block.length == 0) { + // All indices processed. + break; } - } - } else { - for (auto i = 0; i < filter.length(); ++i) { - if (filter.IsValid(i) && filter.Value(i)) { - ++size; + if (args.values_null_count == 0) { + // Values are never null, so things are easier + valid_count += block.popcount; + if (block.popcount == block.length) { + // Fastest path: neither values nor index nulls + BitUtil::SetBitsTo(out_bitmap, out_offset + position, block.length, true); + for (int64_t i = 0; i < block.length; ++i) { + PlaceDataBit(position, indices[position]); + ++position; + } + } else if (block.popcount > 0) { + // Slow path: some but not all indices are null + for (int64_t i = 0; i < block.length; ++i) { + if (BitUtil::GetBit(indices_bitmap, indices_offset + position)) { + // index is not null + BitUtil::SetBit(out_bitmap, out_offset + position); + PlaceDataBit(position, indices[position]); + } + ++position; + } + } + } else { + // Values have nulls, so we must do random access into the values bitmap + if (block.popcount == block.length) { + // Faster path: indices are not null but values may be + for (int64_t i = 0; i < block.length; ++i) { + if (BitUtil::GetBit(values_bitmap, values_offset + indices[position])) { + // value is not null + BitUtil::SetBit(out_bitmap, out_offset + position); + PlaceDataBit(position, indices[position]); + ++valid_count; + } + ++position; + } + } else if (block.popcount > 0) { + // Slow path: some but not all indices are null. Since we are doing + // random access in general we have to check the value nullness one by + // one. + for (int64_t i = 0; i < block.length; ++i) { + if (BitUtil::GetBit(indices_bitmap, indices_offset + position)) { + // index is not null + if (BitUtil::GetBit(values_bitmap, values_offset + indices[position])) { + // value is not null + PlaceDataBit(position, indices[position]); + BitUtil::SetBit(out_bitmap, out_offset + position); + ++valid_count; + } + } + ++position; + } + } } } + out_arr->null_count = out_arr->length - valid_count; } - return size; -} - -struct FilterState : public KernelState { - explicit FilterState(FilterOptions options) : options(std::move(options)) {} - FilterOptions options; }; -std::unique_ptr InitFilter(KernelContext*, const KernelInitArgs& args) { - FilterOptions options; - if (args.options == nullptr) { - options = FilterOptions::Defaults(); +Status PreallocateFilter(KernelContext* ctx, int64_t length, int bit_width, Datum* out) { + // Preallocate memory + ArrayData* out_arr = out->mutable_array(); + out_arr->length = length; + out_arr->buffers.resize(2); + ARROW_ASSIGN_OR_RAISE(out_arr->buffers[0], ctx->AllocateBitmap(length)); + if (bit_width == 1) { + ARROW_ASSIGN_OR_RAISE(out_arr->buffers[1], ctx->AllocateBitmap(length)); } else { - options = *static_cast(args.options); + ARROW_ASSIGN_OR_RAISE(out_arr->buffers[1], ctx->Allocate(length * bit_width / 8)); } - return std::unique_ptr(new FilterState(std::move(options))); + return Status::OK(); } -template -struct FilterFunctor { - using ArrayType = typename TypeTraits::ArrayType; - - template - static void ExecImpl(KernelContext* ctx, const ExecBatch& batch, Datum* out) { - using IS = FilterIndexSequence; - ArrayType values(batch[0].array()); - BooleanArray filter(batch[1].array()); - const int64_t output_size = FilterOutputSize(NullSelection, filter); - std::shared_ptr result; - KERNEL_RETURN_IF_ERROR(ctx, Select(ctx, values, IS(filter, output_size), &result)); - out->value = result->data(); - } - - static void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { - const auto& state = checked_cast(*ctx->state()); - if (state.options.null_selection_behavior == FilterOptions::EMIT_NULL) { - ExecImpl(ctx, batch, out); - } else { - ExecImpl(ctx, batch, out); +static void PrimitiveExec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { + const auto& state = checked_cast(*ctx->state()); + KERNEL_RETURN_IF_ERROR( + ctx, PreallocateFilter(ctx, output_length, args.values_bit_width, out)); + switch (args.values_bit_width) { + case 1: + return BooleanImpl(batch, out); + case 8: + return PrimitiveImpl(batch, out); + case 16: + return PrimitiveImpl(batch, out); + case 32: + return PrimitiveImpl(batch, out); + case 64: + return PrimitiveImpl(batch, out); + default: + DCHECK(false) << "Invalid values byte width"; + break; + } +} + +static void NullExec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { + out->value = std::make_shared(batch.length)->data(); +} + +static void DictionaryExec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { + const auto& state = checked_cast(*ctx->state()); + DictionaryArray values(batch[0].array()); + Result result = + Filter(Datum(values.indices()), batch[1], state.options, ctx->exec_context()); + if (!result.ok()) { + ctx->SetStatus(result.status()); + return; + } + DictionaryArray filtered_values(values.type(), (*result).make_array(), + values.dictionary()); + out->value = filtered_values.data(); +} + +static void ExtensionExec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { + const auto& state = checked_cast(*ctx->state()); + ExtensionArray values(batch[0].array()); + Result result = + Filter(Datum(values.storage()), batch[1], state.options, ctx->exec_context()); + if (!result.ok()) { + ctx->SetStatus(result.status()); + return; + } + ExtensionArray filtered_values(values.type(), (*result).make_array()); + out->value = filtered_values.data(); +} + +// ---------------------------------------------------------------------- + +template +struct GenericImpl { + using ValuesArrayType = typename TypeTraits::ArrayType; + + KernelContext* ctx; + std::shared_ptr values; + std::shared_ptr filter; + ArrayData* out; + TypedBufferBuilder validity_builder; + + GenericImpl(KernelContext* ctx, const ExecBatch& batch, Datum* out) + : ctx(ctx), + values(batch[0].array()), + filter(batch[1].array()), + out(out->mutable_array()), + validity_builder(ctx->memory_pool()) {} + + Status FinishCommon() { + out->buffers.resize(values->buffers.size()); + out->length = validity_builder.length(); + out->null_count = validity_builder.false_count(); + return validity_builder.Finish(&out->buffers[0]); + } + + template + Status VisitFilter(InVisitor&& visit_in, NullVisitor&& visit_null) { + const auto indices_values = indices.GetValues(1); + const uint8_t* bitmap = nullptr; + if (indices.buffers[0]) { + bitmap = indices.buffers[0]->data(); } + OptionalBitIndexer values_is_valid(values->buffers[0], values->offset); + + OptionalBitBlockCounter bit_counter(bitmap, indices.offset, indices.length); + int64_t position = 0; + while (position < indices.length) { + BitBlockCount block = bit_counter.NextBlock(); + if (block.popcount == block.length) { + // Fast path, no null indices + for (int64_t i = 0; i < block.length; ++i) { + if (values_is_valid[indices_values[position]]) { + validity_builder.UnsafeAppend(true); + RETURN_NOT_OK(visit_valid(indices_values[position])); + } else { + validity_builder.UnsafeAppend(false); + RETURN_NOT_OK(visit_null()); + } + ++position; + } + } else { + for (int64_t i = 0; i < block.length; ++i) { + if (BitUtil::GetBit(bitmap, indices.offset + position) && + values_is_valid[indices_values[position]]) { + validity_builder.UnsafeAppend(true); + RETURN_NOT_OK(visit_valid(indices_values[position])); + } else { + validity_builder.UnsafeAppend(false); + RETURN_NOT_OK(visit_null()); + } + ++position; + } + } + } + return Status::OK(); + } + + virtual Status Init() { return Status::OK(); } + + // Implementation specific finish logic + virtual Status Finish() = 0; + + virtual Status ProcessFilter() = 0; + + virtual Status Exec() { + RETURN_NOT_OK(this->validity_builder.Reserve(indices->length)); + RETURN_NOT_OK(Init()); + RETURN_NOT_OK(ProcessFilter()); + RETURN_NOT_OK(FinishCommon()); + return Finish(); + } +}; + +#define LIFT_BASE_MEMBERS() \ + using ValuesArrayType = typename Base::ValuesArrayType; \ + using Base::ctx; \ + using Base::values; \ + using Base::indices; \ + using Base::out; \ + using Base::validity_builder + +static inline Status VisitNoop() { return Status::OK(); } + +// A take implementation for 32-bit and 64-bit variable binary types. Common +// generated kernels are shared between Binary/String and +// LargeBinary/LargeString +template +struct VarBinaryImpl : public GenericImpl { + using offset_type = typename Type::offset_type; + + using Base = GenericImpl; + LIFT_BASE_MEMBERS(); + + std::shared_ptr values_as_binary; + TypedBufferBuilder offset_builder; + TypedBufferBuilder data_builder; + + VarBinaryImpl(KernelContext* ctx, const ExecBatch& batch, Datum* out) + : Base(ctx, batch, out), + offset_builder(ctx->memory_pool()), + data_builder(ctx->memory_pool()) {} + + Status ProcessFilter() override { + ValuesArrayType typed_values(this->values_as_binary); + + // Allocate at least 32K at a time to avoid having to call Reserve for + // every value for lots of small strings + static constexpr int64_t kAllocateChunksize = 1 << 15; + RETURN_NOT_OK(data_builder.Reserve(kAllocateChunksize)); + int64_t space_available = data_builder.capacity(); + + offset_type offset = 0; + RETURN_NOT_OK(this->template VisitFilter( + [&](IndexCType index) { + offset_builder.UnsafeAppend(offset); + auto val = typed_values.GetView(index); + offset_type value_size = static_cast(val.size()); + offset += value_size; + if (ARROW_PREDICT_FALSE(value_size > space_available)) { + RETURN_NOT_OK(data_builder.Reserve(value_size + kAllocateChunksize)); + space_available = data_builder.capacity() - data_builder.length(); + } + data_builder.UnsafeAppend(reinterpret_cast(val.data()), + value_size); + space_available -= value_size; + return Status::OK(); + }, + [&]() { + offset_builder.UnsafeAppend(offset); + return Status::OK(); + })); + offset_builder.UnsafeAppend(offset); + return Status::OK(); + } + + Status Init() override { + ARROW_ASSIGN_OR_RAISE(this->values_as_binary, + GetArrayView(this->values, TypeTraits::type_singleton())); + return offset_builder.Reserve(indices->length + 1); + } + + Status Finish() override { + RETURN_NOT_OK(offset_builder.Finish(&out->buffers[1])); + return data_builder.Finish(&out->buffers[2]); } }; -struct FilterKernelVisitor { - template - Status Visit(const Type&) { - this->result = FilterFunctor::Exec; +struct FSBImpl : public GenericImpl { + using Base = GenericImpl; + LIFT_BASE_MEMBERS(); + + TypedBufferBuilder data_builder; + + FSBImpl(KernelContext* ctx, const ExecBatch& batch, Datum* out) + : Base(ctx, batch, out), data_builder(ctx->memory_pool()) {} + + Status ProcessFilter() { + FixedSizeBinaryArray typed_values(this->values); + int32_t value_size = typed_values.byte_width(); + + RETURN_NOT_OK(data_builder.Reserve(value_size * indices->length)); + RETURN_NOT_OK(this->template VisitFilter( + [&](IndexCType index) { + auto val = typed_values.GetView(index); + data_builder.UnsafeAppend(reinterpret_cast(val.data()), + value_size); + return Status::OK(); + }, + [&]() { + data_builder.UnsafeAppend(value_size, static_cast(0x00)); + return Status::OK(); + })); return Status::OK(); } - Status Create(const DataType& type) { return VisitTypeInline(type, this); } - ArrayKernelExec result; + Status Finish() override { return data_builder.Finish(&out->buffers[1]); } }; -Status GetFilterKernel(const DataType& type, ArrayKernelExec* exec) { - FilterKernelVisitor visitor; - RETURN_NOT_OK(visitor.Create(type)); - *exec = visitor.result; - return Status::OK(); +template +struct ListImpl : public GenericImpl, Type> { + using offset_type = typename Type::offset_type; + + using Base = GenericImpl; + LIFT_BASE_MEMBERS(); + + TypedBufferBuilder offset_builder; + typename TypeTraits::OffsetBuilderType child_index_builder; + + ListImpl(KernelContext* ctx, const ExecBatch& batch, Datum* out) + : Base(ctx, batch, out), + offset_builder(ctx->memory_pool()), + child_index_builder(ctx->memory_pool()) {} + + Status ProcessFilter() { + ValuesArrayType typed_values(this->values); + + offset_type offset = 0; + auto PushValidIndex = [&](IndexCType index) { + offset_builder.UnsafeAppend(offset); + offset_type value_offset = typed_values.value_offset(index); + offset_type value_length = typed_values.value_length(index); + offset += value_length; + RETURN_NOT_OK(child_index_builder.Reserve(value_length)); + for (offset_type j = value_offset; j < value_offset + value_length; ++j) { + child_index_builder.UnsafeAppend(j); + } + return Status::OK(); + }; + + auto PushNullIndex = [&]() { + offset_builder.UnsafeAppend(offset); + return Status::OK(); + }; + + RETURN_NOT_OK(this->template VisitFilter( + *indices, std::move(PushValidIndex), std::move(PushNullIndex))); + offset_builder.UnsafeAppend(offset); + return Status::OK(); + } + + Status Init() override { + RETURN_NOT_OK(offset_builder.Reserve(indices->length + 1)); + return Status::OK(); + } + + Status Finish() override { + std::shared_ptr child_indices; + RETURN_NOT_OK(child_index_builder.Finish(&child_indices)); + + ValuesArrayType typed_values(this->values); + + // No need to boundscheck the child values indices + ARROW_ASSIGN_OR_RAISE(std::shared_ptr taken_child, + Filter(*typed_values.values(), *child_indices, + TakeOptions::NoBoundscheck(), ctx->exec_context())); + RETURN_NOT_OK(offset_builder.Finish(&out->buffers[1])); + out->child_data = {taken_child->data()}; + return Status::OK(); + } +}; + +struct FSLImpl : public GenericImpl { + Int64Builder child_index_builder; + + using Base = GenericImpl; + LIFT_BASE_MEMBERS(); + + FSLImpl(KernelContext* ctx, const ExecBatch& batch, Datum* out) + : Base(ctx, batch, out), child_index_builder(ctx->memory_pool()) {} + + template + Status ProcessIndices() { + ValuesArrayType typed_values(this->values); + int32_t list_size = typed_values.list_type()->list_size(); + + /// We must take list_size elements even for null elements of + /// indices. + RETURN_NOT_OK(child_index_builder.Reserve(indices->length * list_size)); + return this->template VisitFilter( + [&](IndexCType index) { + int64_t offset = index * list_size; + for (int64_t j = offset; j < offset + list_size; ++j) { + child_index_builder.UnsafeAppend(j); + } + return Status::OK(); + }, + [&]() { return child_index_builder.AppendNulls(list_size); }); + } + + Status Finish() override { + std::shared_ptr child_indices; + RETURN_NOT_OK(child_index_builder.Finish(&child_indices)); + + ValuesArrayType typed_values(this->values); + + // No need to boundscheck the child values indices + ARROW_ASSIGN_OR_RAISE(std::shared_ptr taken_child, + Take(*typed_values.values(), *child_indices, + TakeOptions::NoBoundscheck(), ctx->exec_context())); + out->child_data = {taken_child->data()}; + return Status::OK(); + } +}; + +struct StructImpl : public GenericImpl { + using Base = GenericTakeImpl; + LIFT_BASE_MEMBERS(); + + StructTakeImpl(KernelContext* ctx, const ExecBatch& batch, Datum* out) + : Base(ctx, batch, out) {} + + template + Status ProcessIndices() { + StructArray typed_values(values); + return this->template VisitFilter( + [&](IndexCType index) { return Status::OK(); }, + /*visit_null=*/VisitNoop); + } + + Status Finish() override { + StructArray typed_values(values); + + // Select from children without boundschecking + out->child_data.resize(values->type->num_fields()); + for (int field_index = 0; field_index < values->type->num_fields(); ++field_index) { + ARROW_ASSIGN_OR_RAISE(Datum taken_field, + Take(Datum(typed_values.field(field_index)), Datum(indices), + TakeOptions::NoBoundscheck(), ctx->exec_context())); + out->child_data[field_index] = taken_field.array(); + } + return Status::OK(); + } +}; + +template +static void GenericTakeExec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { + const auto& state = checked_cast(*ctx->state()); + Impl kernel(ctx, batch, out); + KERNEL_RETURN_IF_ERROR(ctx, kernel.Exec()); } +// ---------------------------------------------------------------------- + Result> FilterRecordBatch(const RecordBatch& batch, const Datum& filter, const FunctionOptions* options, @@ -218,26 +752,44 @@ class FilterMetaFunction : public MetaFunction { void RegisterVectorFilter(FunctionRegistry* registry) { VectorKernel base; - base.init = InitFilter; + base.init = InitWrapOptions; - auto filter = std::make_shared("array_filter", Arity::Binary()); + auto array_filter = std::make_shared("array_filter", Arity::Binary()); InputType filter_ty = InputType::Array(boolean()); OutputType out_ty(FirstType); - auto AddKernel = [&](InputType in_ty, const DataType& example_type) { - base.signature = KernelSignature::Make({in_ty, filter_ty}, out_ty); - DCHECK_OK(GetFilterKernel(example_type, &base.exec)); - DCHECK_OK(filter->AddKernel(base)); + auto AddKernel = [&](InputType value_ty, ArrayKernelExec exec) { + base.signature = + KernelSignature::Make({value_ty, filter_ty}, OutputType(FirstType)); + base.exec = exec; + DCHECK_OK(array_take->AddKernel(base)); }; - for (const auto& value_ty : PrimitiveTypes()) { - AddKernel(InputType::Array(value_ty), *value_ty); - } - // Other types where we may only on the DataType::id - for (const auto& value_ty : ExampleParametricTypes()) { - AddKernel(InputType::Array(value_ty->id()), *value_ty); - } - DCHECK_OK(registry->AddFunction(std::move(filter))); + // Single kernel entry point for all primitive types + AddKernel(InputType(match::Primitive(), ValueDescr::ARRAY), PrimitiveExec); + + // Implementations for Binary, String, LargeBinary, LargeString, and + // FixedSizeBinary + AddKernel(InputType(match::BinaryLike(), ValueDescr::ARRAY), + GenericExec>); + AddKernel(InputType(match::LargeBinaryLike(), ValueDescr::ARRAY), + GenericExec>); + AddKernel(InputType::Array(Type::FIXED_SIZE_BINARY), GenericExec); + + AddKernel(InputType::Array(null()), NullExec); + AddKernel(InputType::Array(Type::DECIMAL), GenericExec); + AddKernel(InputType::Array(Type::DICTIONARY), DictionaryExec); + AddKernel(InputType::Array(Type::EXTENSION), ExtensionExec); + AddKernel(InputType::Array(Type::LIST), GenericExec>); + AddKernel(InputType::Array(Type::LARGE_LIST), + GenericExec>); + AddKernel(InputType::Array(Type::FIXED_SIZE_LIST), GenericExec); + AddKernel(InputType::Array(Type::STRUCT), GenericExec); + + // TODO: Reuse ListType kernel for MAP + AddKernel(InputType::Array(Type::MAP), GenericExec>); + + DCHECK_OK(registry->AddFunction(std::move(array_filter))); // Add filter metafunction DCHECK_OK(registry->AddFunction(std::make_shared())); diff --git a/cpp/src/arrow/compute/kernels/vector_selection_benchmark.cc b/cpp/src/arrow/compute/kernels/vector_selection_benchmark.cc index 6031a907dee..822363ac554 100644 --- a/cpp/src/arrow/compute/kernels/vector_selection_benchmark.cc +++ b/cpp/src/arrow/compute/kernels/vector_selection_benchmark.cc @@ -90,7 +90,7 @@ struct TakeBenchmark { args(state), rand(kSeed), indices_have_nulls(indices_have_nulls), - monotonic_indices(false) {} + monotonic_indices(monotonic_indices) {} void Int64() { const int64_t array_size = args.size / sizeof(int64_t); @@ -123,10 +123,10 @@ struct TakeBenchmark { } void Bench(const std::shared_ptr& values) { - bool indices_null_proportion = indices_have_nulls ? args.null_proportion : 0; + double indices_null_proportion = indices_have_nulls ? args.null_proportion : 0; auto indices = - rand.Int32(static_cast(values->length()), 0, - static_cast(values->length() - 1), indices_null_proportion); + rand.Int32(values->length(), 0, static_cast(values->length() - 1), + indices_null_proportion); if (monotonic_indices) { auto arg_sorter = *SortToIndices(*indices); @@ -277,12 +277,12 @@ void TakeSetArgs(benchmark::internal::Benchmark* bench) { BENCHMARK(TakeInt64RandomIndicesNoNulls)->Apply(TakeSetArgs); BENCHMARK(TakeInt64RandomIndicesWithNulls)->Apply(TakeSetArgs); +BENCHMARK(TakeInt64MonotonicIndices)->Apply(TakeSetArgs); BENCHMARK(TakeFSLInt64RandomIndicesNoNulls)->Apply(TakeSetArgs); BENCHMARK(TakeFSLInt64RandomIndicesWithNulls)->Apply(TakeSetArgs); +BENCHMARK(TakeFSLInt64MonotonicIndices)->Apply(TakeSetArgs); BENCHMARK(TakeStringRandomIndicesNoNulls)->Apply(TakeSetArgs); BENCHMARK(TakeStringRandomIndicesWithNulls)->Apply(TakeSetArgs); -BENCHMARK(TakeInt64MonotonicIndices)->Apply(TakeSetArgs); -BENCHMARK(TakeFSLInt64MonotonicIndices)->Apply(TakeSetArgs); BENCHMARK(TakeStringMonotonicIndices)->Apply(TakeSetArgs); } // namespace compute diff --git a/cpp/src/arrow/compute/kernels/vector_selection_internal.h b/cpp/src/arrow/compute/kernels/vector_selection_internal.h deleted file mode 100644 index 6333ef3a671..00000000000 --- a/cpp/src/arrow/compute/kernels/vector_selection_internal.h +++ /dev/null @@ -1,778 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include -#include -#include -#include -#include - -#include "arrow/builder.h" -#include "arrow/compute/api_vector.h" -#include "arrow/compute/kernels/common.h" -#include "arrow/record_batch.h" -#include "arrow/result.h" - -namespace arrow { -namespace compute { -namespace internal { - -template -using enable_if_not_base_binary = - enable_if_t::value, R>; - -// For non-binary builders, use regular value append -template -static enable_if_not_base_binary UnsafeAppend( - Builder* builder, Scalar&& value) { - builder->UnsafeAppend(std::forward(value)); - return Status::OK(); -} - -// For binary builders, need to reserve byte storage first -template -static enable_if_base_binary UnsafeAppend( - Builder* builder, util::string_view value) { - RETURN_NOT_OK(builder->ReserveData(static_cast(value.size()))); - builder->UnsafeAppend(value); - return Status::OK(); -} - -/// \brief visit indices from an IndexSequence while bounds checking -/// -/// \param[in] indices IndexSequence to visit -/// \param[in] values array to bounds check against, if necessary -/// \param[in] vis index visitor, signature must be Status(int64_t index, bool is_valid) -template -Status VisitIndices(IndexSequence indices, const Array& values, Visitor&& vis) { - for (int64_t i = 0; i < indices.length(); ++i) { - auto index_valid = indices.Next(); - if (SomeIndicesNull && !index_valid.second) { - RETURN_NOT_OK(vis(0, false)); - continue; - } - - auto index = index_valid.first; - if (!NeverOutOfBounds) { - if (index < 0 || index >= values.length()) { - return Status::IndexError("take index out of bounds"); - } - } else { - DCHECK_GE(index, 0); - DCHECK_LT(index, values.length()); - } - - bool is_valid = !SomeValuesNull || values.IsValid(index); - RETURN_NOT_OK(vis(index, is_valid)); - } - return Status::OK(); -} - -template -Status VisitIndices(IndexSequence indices, const Array& values, Visitor&& vis) { - if (indices.never_out_of_bounds()) { - return VisitIndices( - indices, values, std::forward(vis)); - } - return VisitIndices(indices, values, - std::forward(vis)); -} - -template -Status VisitIndices(IndexSequence indices, const Array& values, Visitor&& vis) { - if (values.null_count() == 0) { - return VisitIndices(indices, values, - std::forward(vis)); - } - return VisitIndices(indices, values, std::forward(vis)); -} - -template -Status VisitIndices(IndexSequence indices, const Array& values, Visitor&& vis) { - if (indices.null_count() == 0) { - return VisitIndices(indices, values, std::forward(vis)); - } - return VisitIndices(indices, values, std::forward(vis)); -} - -// Helper class for gathering values from an array -template -class Taker { - public: - explicit Taker(const std::shared_ptr& type) : type_(type) {} - - virtual ~Taker() = default; - - // initialize this taker including constructing any children, - // must be called once after construction before any other methods are called - virtual Status Init() { return Status::OK(); } - - // reset this Taker and set KernelContext for taking an array - // must be called each time the KernelContext may have changed - virtual Status SetContext(KernelContext* ctx) = 0; - - // gather elements from an array at the provided indices - virtual Status Take(const Array& values, IndexSequence indices) = 0; - - // assemble an array of all gathered values - virtual Status Finish(std::shared_ptr*) = 0; - - // factory; the output Taker will support gathering values of the given type - static Status Make(const std::shared_ptr& type, std::unique_ptr* out); - - static_assert(std::is_literal_type::value, - "Index sequences must be literal type"); - - static_assert(std::is_copy_constructible::value, - "Index sequences must be copy constructible"); - - static_assert(std::is_same().Next()), - std::pair>::value, - "An index sequence must yield pairs of indices:int64_t, validity:bool."); - - static_assert(std::is_same().length()), - int64_t>::value, - "An index sequence must provide its length."); - - static_assert(std::is_same().null_count()), - int64_t>::value, - "An index sequence must provide the number of nulls it will take."); - - static_assert( - std::is_same().never_out_of_bounds()), - bool>::value, - "Index sequences must declare whether bounds checking is necessary"); - - static_assert( - std::is_same().set_never_out_of_bounds()), - void>::value, - "An index sequence must support ignoring bounds checking."); - - protected: - template - Status MakeBuilder(MemoryPool* pool, std::unique_ptr* out) { - std::unique_ptr builder; - RETURN_NOT_OK(arrow::MakeBuilder(pool, type_, &builder)); - out->reset(checked_cast(builder.release())); - return Status::OK(); - } - - std::shared_ptr type_; -}; - -// an IndexSequence which yields indices from a specified range -// or yields null for the length of that range -class RangeIndexSequence { - public: - constexpr bool never_out_of_bounds() const { return true; } - void set_never_out_of_bounds() {} - - constexpr RangeIndexSequence() = default; - - RangeIndexSequence(bool is_valid, int64_t offset, int64_t length) - : is_valid_(is_valid), index_(offset), length_(length) {} - - std::pair Next() { return std::make_pair(index_++, is_valid_); } - - int64_t length() const { return length_; } - - int64_t null_count() const { return is_valid_ ? 0 : length_; } - - private: - bool is_valid_ = true; - int64_t index_ = 0, length_ = -1; -}; - -// an IndexSequence which yields the values of an Array of integers -template -class ArrayIndexSequence { - public: - bool never_out_of_bounds() const { return never_out_of_bounds_; } - void set_never_out_of_bounds() { never_out_of_bounds_ = true; } - - constexpr ArrayIndexSequence() = default; - - explicit ArrayIndexSequence(const Array& indices) - : indices_(&checked_cast&>(indices)) {} - - std::pair Next() { - if (indices_->IsNull(index_)) { - ++index_; - return std::make_pair(-1, false); - } - return std::make_pair(indices_->Value(index_++), true); - } - - int64_t length() const { return indices_->length(); } - - int64_t null_count() const { return indices_->null_count(); } - - private: - const NumericArray* indices_ = nullptr; - int64_t index_ = 0; - bool never_out_of_bounds_ = false; -}; - -// Default implementation: taking from a simple array into a builder requires only that -// the array supports array.GetView() and the corresponding builder supports -// builder.UnsafeAppend(array.GetView()) -template -class TakerImpl : public Taker { - public: - using ArrayType = typename TypeTraits::ArrayType; - using BuilderType = typename TypeTraits::BuilderType; - - using Taker::Taker; - - Status SetContext(KernelContext* ctx) override { - return this->MakeBuilder(ctx->memory_pool(), &builder_); - } - - Status Take(const Array& values, IndexSequence indices) override { - DCHECK(this->type_->Equals(values.type())); - RETURN_NOT_OK(builder_->Reserve(indices.length())); - return VisitIndices(indices, values, [&](int64_t index, bool is_valid) { - if (!is_valid) { - builder_->UnsafeAppendNull(); - return Status::OK(); - } - auto value = checked_cast(values).GetView(index); - return UnsafeAppend(builder_.get(), value); - }); - } - - Status Finish(std::shared_ptr* out) override { return builder_->Finish(out); } - - private: - std::unique_ptr builder_; -}; - -// Gathering from NullArrays is trivial; skip the builder and just -// do bounds checking -template -class TakerImpl : public Taker { - public: - using Taker::Taker; - - Status SetContext(KernelContext*) override { return Status::OK(); } - - Status Take(const Array& values, IndexSequence indices) override { - DCHECK(this->type_->Equals(values.type())); - - length_ += indices.length(); - - if (indices.never_out_of_bounds()) { - return Status::OK(); - } - - return VisitIndices(indices, values, [](int64_t, bool) { return Status::OK(); }); - } - - Status Finish(std::shared_ptr* out) override { - out->reset(new NullArray(length_)); - return Status::OK(); - } - - private: - int64_t length_ = 0; -}; - -template -class ListTakerImpl : public Taker { - public: - using offset_type = typename TypeClass::offset_type; - using ArrayType = typename TypeTraits::ArrayType; - - using Taker::Taker; - - Status Init() override { - const auto& list_type = checked_cast(*this->type_); - return Taker::Make(list_type.value_type(), &value_taker_); - } - - Status SetContext(KernelContext* ctx) override { - auto pool = ctx->memory_pool(); - null_bitmap_builder_.reset(new TypedBufferBuilder(pool)); - offset_builder_.reset(new TypedBufferBuilder(pool)); - RETURN_NOT_OK(offset_builder_->Append(0)); - return value_taker_->SetContext(ctx); - } - - Status Take(const Array& values, IndexSequence indices) override { - DCHECK(this->type_->Equals(values.type())); - - const auto& list_array = checked_cast(values); - - RETURN_NOT_OK(null_bitmap_builder_->Reserve(indices.length())); - RETURN_NOT_OK(offset_builder_->Reserve(indices.length())); - - offset_type offset = offset_builder_->data()[offset_builder_->length() - 1]; - return VisitIndices(indices, values, [&](int64_t index, bool is_valid) { - null_bitmap_builder_->UnsafeAppend(is_valid); - - if (is_valid) { - offset += list_array.value_length(index); - RangeIndexSequence value_indices(true, list_array.value_offset(index), - list_array.value_length(index)); - RETURN_NOT_OK(value_taker_->Take(*list_array.values(), value_indices)); - } - - offset_builder_->UnsafeAppend(offset); - return Status::OK(); - }); - } - - Status Finish(std::shared_ptr* out) override { - auto null_count = null_bitmap_builder_->false_count(); - auto length = null_bitmap_builder_->length(); - - std::shared_ptr offsets, null_bitmap; - RETURN_NOT_OK(null_bitmap_builder_->Finish(&null_bitmap)); - RETURN_NOT_OK(offset_builder_->Finish(&offsets)); - - std::shared_ptr taken_values; - RETURN_NOT_OK(value_taker_->Finish(&taken_values)); - - out->reset(new ArrayType(this->type_, length, offsets, taken_values, null_bitmap, - null_count)); - return Status::OK(); - } - - std::unique_ptr> null_bitmap_builder_; - std::unique_ptr> offset_builder_; - std::unique_ptr> value_taker_; -}; - -template -class TakerImpl : public ListTakerImpl { - using ListTakerImpl::ListTakerImpl; -}; - -template -class TakerImpl - : public ListTakerImpl { - using ListTakerImpl::ListTakerImpl; -}; - -template -class TakerImpl : public ListTakerImpl { - using ListTakerImpl::ListTakerImpl; -}; - -template -class TakerImpl : public Taker { - public: - using Taker::Taker; - - Status Init() override { - const auto& list_type = checked_cast(*this->type_); - return Taker::Make(list_type.value_type(), &value_taker_); - } - - Status SetContext(KernelContext* ctx) override { - auto pool = ctx->memory_pool(); - null_bitmap_builder_.reset(new TypedBufferBuilder(pool)); - return value_taker_->SetContext(ctx); - } - - Status Take(const Array& values, IndexSequence indices) override { - DCHECK(this->type_->Equals(values.type())); - - const auto& list_array = checked_cast(values); - auto list_size = list_array.list_type()->list_size(); - - RETURN_NOT_OK(null_bitmap_builder_->Reserve(indices.length())); - return VisitIndices(indices, values, [&](int64_t index, bool is_valid) { - null_bitmap_builder_->UnsafeAppend(is_valid); - - // for FixedSizeList, null lists are not empty (they also span a segment of - // list_size in the child data), so we must append to value_taker_ even if !is_valid - RangeIndexSequence value_indices(is_valid, list_array.value_offset(index), - list_size); - return value_taker_->Take(*list_array.values(), value_indices); - }); - } - - Status Finish(std::shared_ptr* out) override { - auto null_count = null_bitmap_builder_->false_count(); - auto length = null_bitmap_builder_->length(); - - std::shared_ptr null_bitmap; - RETURN_NOT_OK(null_bitmap_builder_->Finish(&null_bitmap)); - - std::shared_ptr taken_values; - RETURN_NOT_OK(value_taker_->Finish(&taken_values)); - - out->reset(new FixedSizeListArray(this->type_, length, taken_values, null_bitmap, - null_count)); - return Status::OK(); - } - - protected: - std::unique_ptr> null_bitmap_builder_; - std::unique_ptr> value_taker_; -}; - -template -class TakerImpl : public Taker { - public: - using Taker::Taker; - - Status Init() override { - children_.resize(this->type_->num_fields()); - for (int i = 0; i < this->type_->num_fields(); ++i) { - RETURN_NOT_OK( - Taker::Make(this->type_->field(i)->type(), &children_[i])); - } - return Status::OK(); - } - - Status SetContext(KernelContext* ctx) override { - null_bitmap_builder_.reset(new TypedBufferBuilder(ctx->memory_pool())); - for (int i = 0; i < this->type_->num_fields(); ++i) { - RETURN_NOT_OK(children_[i]->SetContext(ctx)); - } - return Status::OK(); - } - - Status Take(const Array& values, IndexSequence indices) override { - DCHECK(this->type_->Equals(values.type())); - - RETURN_NOT_OK(null_bitmap_builder_->Reserve(indices.length())); - RETURN_NOT_OK(VisitIndices(indices, values, [&](int64_t, bool is_valid) { - null_bitmap_builder_->UnsafeAppend(is_valid); - return Status::OK(); - })); - - // bounds checking was done while appending to the null bitmap - indices.set_never_out_of_bounds(); - - const auto& struct_array = checked_cast(values); - for (int i = 0; i < this->type_->num_fields(); ++i) { - RETURN_NOT_OK(children_[i]->Take(*struct_array.field(i), indices)); - } - return Status::OK(); - } - - Status Finish(std::shared_ptr* out) override { - auto null_count = null_bitmap_builder_->false_count(); - auto length = null_bitmap_builder_->length(); - std::shared_ptr null_bitmap; - RETURN_NOT_OK(null_bitmap_builder_->Finish(&null_bitmap)); - - ArrayVector fields(this->type_->num_fields()); - for (int i = 0; i < this->type_->num_fields(); ++i) { - RETURN_NOT_OK(children_[i]->Finish(&fields[i])); - } - - out->reset( - new StructArray(this->type_, length, std::move(fields), null_bitmap, null_count)); - return Status::OK(); - } - - protected: - std::unique_ptr> null_bitmap_builder_; - std::vector>> children_; -}; - -template -class TakerImpl : public Taker { - public: - using Taker::Taker; - - Status Init() override { - union_type_ = checked_cast(this->type_.get()); - - if (union_type_->mode() == UnionMode::SPARSE) { - sparse_children_.resize(this->type_->num_fields()); - } else { - dense_children_.resize(this->type_->num_fields()); - child_length_.resize(union_type_->max_type_code() + 1); - } - - for (int i = 0; i < this->type_->num_fields(); ++i) { - if (union_type_->mode() == UnionMode::SPARSE) { - RETURN_NOT_OK(Taker::Make(this->type_->field(i)->type(), - &sparse_children_[i])); - } else { - RETURN_NOT_OK(Taker>::Make( - this->type_->field(i)->type(), &dense_children_[i])); - } - } - - return Status::OK(); - } - - Status SetContext(KernelContext* ctx) override { - pool_ = ctx->memory_pool(); - null_bitmap_builder_.reset(new TypedBufferBuilder(pool_)); - type_code_builder_.reset(new TypedBufferBuilder(pool_)); - - if (union_type_->mode() == UnionMode::DENSE) { - offset_builder_.reset(new TypedBufferBuilder(pool_)); - std::fill(child_length_.begin(), child_length_.end(), 0); - } - - for (int i = 0; i < this->type_->num_fields(); ++i) { - if (union_type_->mode() == UnionMode::SPARSE) { - RETURN_NOT_OK(sparse_children_[i]->SetContext(ctx)); - } else { - RETURN_NOT_OK(dense_children_[i]->SetContext(ctx)); - } - } - - return Status::OK(); - } - - Status Take(const Array& values, IndexSequence indices) override { - DCHECK(this->type_->Equals(values.type())); - const auto& union_array = checked_cast(values); - auto type_codes = union_array.raw_type_codes(); - - if (union_type_->mode() == UnionMode::SPARSE) { - RETURN_NOT_OK(null_bitmap_builder_->Reserve(indices.length())); - RETURN_NOT_OK(type_code_builder_->Reserve(indices.length())); - RETURN_NOT_OK(VisitIndices(indices, values, [&](int64_t index, bool is_valid) { - null_bitmap_builder_->UnsafeAppend(is_valid); - type_code_builder_->UnsafeAppend(type_codes[index]); - return Status::OK(); - })); - - // bounds checking was done while appending to the null bitmap - indices.set_never_out_of_bounds(); - - for (int i = 0; i < this->type_->num_fields(); ++i) { - RETURN_NOT_OK(sparse_children_[i]->Take(*union_array.field(i), indices)); - } - } else { - // Gathering from the offsets into child arrays is a bit tricky. - std::vector child_counts(union_type_->max_type_code() + 1); - RETURN_NOT_OK(null_bitmap_builder_->Reserve(indices.length())); - RETURN_NOT_OK(type_code_builder_->Reserve(indices.length())); - RETURN_NOT_OK(VisitIndices(indices, values, [&](int64_t index, bool is_valid) { - null_bitmap_builder_->UnsafeAppend(is_valid); - type_code_builder_->UnsafeAppend(type_codes[index]); - child_counts[type_codes[index]] += is_valid; - return Status::OK(); - })); - - // bounds checking was done while appending to the null bitmap - indices.set_never_out_of_bounds(); - - // Allocate temporary storage for the offsets of all valid slots - auto child_offsets_storage_size = - std::accumulate(child_counts.begin(), child_counts.end(), 0); - ARROW_ASSIGN_OR_RAISE( - std::shared_ptr child_offsets_storage, - AllocateBuffer(child_offsets_storage_size * sizeof(int32_t), pool_)); - - // Partition offsets by type_code: child_offset_partitions[type_code] will - // point to storage for child_counts[type_code] offsets - std::vector child_offset_partitions(child_counts.size()); - auto child_offsets_storage_data = GetInt32(child_offsets_storage); - for (auto type_code : union_type_->type_codes()) { - child_offset_partitions[type_code] = child_offsets_storage_data; - child_offsets_storage_data += child_counts[type_code]; - } - DCHECK_EQ(child_offsets_storage_data - GetInt32(child_offsets_storage), - child_offsets_storage_size); - - // Fill child_offsets_storage with the taken offsets - RETURN_NOT_OK(offset_builder_->Reserve(indices.length())); - RETURN_NOT_OK(VisitIndices(indices, values, [&](int64_t index, bool is_valid) { - auto type_code = type_codes[index]; - if (is_valid) { - offset_builder_->UnsafeAppend(child_length_[type_code]++); - *child_offset_partitions[type_code] = union_array.value_offset(index); - ++child_offset_partitions[type_code]; - } else { - offset_builder_->UnsafeAppend(0); - } - return Status::OK(); - })); - - // Take from each child at those offsets - int64_t taken_offset_begin = 0; - for (int i = 0; i < this->type_->num_fields(); ++i) { - auto type_code = union_type_->type_codes()[i]; - auto length = child_counts[type_code]; - Int32Array taken_offsets(length, SliceBuffer(child_offsets_storage, - sizeof(int32_t) * taken_offset_begin, - sizeof(int32_t) * length)); - ArrayIndexSequence child_indices(taken_offsets); - child_indices.set_never_out_of_bounds(); - RETURN_NOT_OK(dense_children_[i]->Take(*union_array.field(i), child_indices)); - taken_offset_begin += length; - } - } - - return Status::OK(); - } - - Status Finish(std::shared_ptr* out) override { - auto null_count = null_bitmap_builder_->false_count(); - auto length = null_bitmap_builder_->length(); - std::shared_ptr null_bitmap, type_codes; - RETURN_NOT_OK(null_bitmap_builder_->Finish(&null_bitmap)); - RETURN_NOT_OK(type_code_builder_->Finish(&type_codes)); - - std::shared_ptr offsets; - if (union_type_->mode() == UnionMode::DENSE) { - RETURN_NOT_OK(offset_builder_->Finish(&offsets)); - } - - ArrayVector fields(this->type_->num_fields()); - for (int i = 0; i < this->type_->num_fields(); ++i) { - if (union_type_->mode() == UnionMode::SPARSE) { - RETURN_NOT_OK(sparse_children_[i]->Finish(&fields[i])); - } else { - RETURN_NOT_OK(dense_children_[i]->Finish(&fields[i])); - } - } - - out->reset(new UnionArray(this->type_, length, std::move(fields), type_codes, offsets, - null_bitmap, null_count)); - return Status::OK(); - } - - protected: - int32_t* GetInt32(const std::shared_ptr& b) const { - return reinterpret_cast(b->mutable_data()); - } - - const UnionType* union_type_ = nullptr; - MemoryPool* pool_ = nullptr; - std::unique_ptr> null_bitmap_builder_; - std::unique_ptr> type_code_builder_; - std::unique_ptr> offset_builder_; - std::vector>> sparse_children_; - std::vector>>> dense_children_; - std::vector child_length_; -}; - -// taking from a DictionaryArray is accomplished by taking from its indices -template -class TakerImpl : public Taker { - public: - using Taker::Taker; - - Status Init() override { - const auto& dict_type = checked_cast(*this->type_); - return Taker::Make(dict_type.index_type(), &index_taker_); - } - - Status SetContext(KernelContext* ctx) override { - dictionary_ = nullptr; - return index_taker_->SetContext(ctx); - } - - Status Take(const Array& values, IndexSequence indices) override { - DCHECK(this->type_->Equals(values.type())); - const auto& dict_array = checked_cast(values); - - if (dictionary_ != nullptr && dictionary_ != dict_array.dictionary()) { - return Status::NotImplemented( - "taking from DictionaryArrays with different dictionaries"); - } else { - dictionary_ = dict_array.dictionary(); - } - return index_taker_->Take(*dict_array.indices(), indices); - } - - Status Finish(std::shared_ptr* out) override { - std::shared_ptr taken_indices; - RETURN_NOT_OK(index_taker_->Finish(&taken_indices)); - out->reset(new DictionaryArray(this->type_, taken_indices, dictionary_)); - return Status::OK(); - } - - protected: - std::shared_ptr dictionary_; - std::unique_ptr> index_taker_; -}; - -// taking from an ExtensionArray is accomplished by taking from its storage -template -class TakerImpl : public Taker { - public: - using Taker::Taker; - - Status Init() override { - const auto& ext_type = checked_cast(*this->type_); - return Taker::Make(ext_type.storage_type(), &storage_taker_); - } - - Status SetContext(KernelContext* ctx) override { - return storage_taker_->SetContext(ctx); - } - - Status Take(const Array& values, IndexSequence indices) override { - DCHECK(this->type_->Equals(values.type())); - const auto& ext_array = checked_cast(values); - return storage_taker_->Take(*ext_array.storage(), indices); - } - - Status Finish(std::shared_ptr* out) override { - std::shared_ptr taken_storage; - RETURN_NOT_OK(storage_taker_->Finish(&taken_storage)); - out->reset(new ExtensionArray(this->type_, taken_storage)); - return Status::OK(); - } - - protected: - std::unique_ptr> storage_taker_; -}; - -template -struct TakerMakeImpl { - template - Status Visit(const T&) { - out_->reset(new TakerImpl(type_)); - return (*out_)->Init(); - } - - std::shared_ptr type_; - std::unique_ptr>* out_; -}; - -template -Status Taker::Make(const std::shared_ptr& type, - std::unique_ptr* out) { - TakerMakeImpl visitor{type, out}; - return VisitTypeInline(*type, &visitor); -} - -int64_t FilterOutputSize(FilterOptions::NullSelectionBehavior null_selection, - const Array& filter); - -template -Status Select(KernelContext* ctx, const Array& values, IndexSequence sequence, - std::shared_ptr* out) { - std::unique_ptr> taker; - RETURN_NOT_OK(Taker::Make(values.type(), &taker)); - RETURN_NOT_OK(taker->SetContext(ctx)); - RETURN_NOT_OK(taker->Take(values, std::move(sequence))); - return taker->Finish(out); -} - -} // namespace internal -} // namespace compute -} // namespace arrow diff --git a/cpp/src/arrow/compute/kernels/vector_take.cc b/cpp/src/arrow/compute/kernels/vector_take.cc index 1658f965212..8aae6811a2a 100644 --- a/cpp/src/arrow/compute/kernels/vector_take.cc +++ b/cpp/src/arrow/compute/kernels/vector_take.cc @@ -15,15 +15,32 @@ // specific language governing permissions and limitations // under the License. +#include +#include +#include + #include "arrow/array/array_base.h" +#include "arrow/array/builder_primitive.h" #include "arrow/array/concatenate.h" +#include "arrow/array/data.h" +#include "arrow/buffer_builder.h" #include "arrow/compute/api_vector.h" #include "arrow/compute/kernels/common.h" -#include "arrow/compute/kernels/vector_selection_internal.h" #include "arrow/record_batch.h" #include "arrow/result.h" +#include "arrow/util/bit_block_counter.h" +#include "arrow/util/bitmap_reader.h" +#include "arrow/util/int_util.h" namespace arrow { + +using internal::BitBlockCount; +using internal::BitmapReader; +using internal::GetArrayView; +using internal::IndexBoundscheck; +using internal::OptionalBitBlockCounter; +using internal::OptionalBitIndexer; + namespace compute { namespace internal { @@ -38,44 +55,715 @@ std::unique_ptr InitTake(KernelContext*, const KernelInitArgs& args return std::unique_ptr(new TakeState{*take_options}); } -template -struct TakeFunctor { - using ValueArrayType = typename TypeTraits::ArrayType; - using IndexArrayType = typename TypeTraits::ArrayType; - using IS = ArrayIndexSequence; +namespace {} // namespace + +// ---------------------------------------------------------------------- +// Implement optimized take for primitive types from boolean to 1/2/4/8-byte +// C-type based types. Use common implementation for every byte width and only +// generate code for unsigned integer indices, since after boundschecking to +// check for negative numbers the indices we can safely reinterpret_cast signed +// integers as unsigned. + +struct PrimitiveTakeArgs { + const uint8_t* values; + const uint8_t* values_bitmap = nullptr; + int values_bit_width; + int64_t values_length; + int64_t values_offset; + int64_t values_null_count; + const uint8_t* indices; + const uint8_t* indices_bitmap = nullptr; + int indices_bit_width; + int64_t indices_length; + int64_t indices_offset; + int64_t indices_null_count; +}; + +// Reduce code size by dealing with the unboxing of the kernel inputs once +// rather than duplicating compiled code to do all these in each kernel. +PrimitiveTakeArgs GetPrimitiveTakeArgs(const ExecBatch& batch) { + PrimitiveTakeArgs args; + + const ArrayData& arg0 = *batch[0].array(); + const ArrayData& arg1 = *batch[1].array(); + + // Values + args.values_bit_width = static_cast(*arg0.type).bit_width(); + args.values = arg0.buffers[1]->data(); + if (args.values_bit_width > 1) { + args.values += arg0.offset * args.values_bit_width / 8; + } + args.values_length = arg0.length; + args.values_offset = arg0.offset; + args.values_null_count = arg0.GetNullCount(); + if (arg0.buffers[0]) { + args.values_bitmap = arg0.buffers[0]->data(); + } + + // Indices + args.indices_bit_width = static_cast(*arg1.type).bit_width(); + args.indices = arg1.buffers[1]->data() + arg1.offset * args.indices_bit_width / 8; + args.indices_length = arg1.length; + args.indices_offset = arg1.offset; + args.indices_null_count = arg1.GetNullCount(); + if (arg1.buffers[0]) { + args.indices_bitmap = arg1.buffers[0]->data(); + } + + return args; +} + +/// \brief The Take implementation for primitive (fixed-width) types does not +/// use the logical Arrow type but rather then physical C type. This way we +/// only generate one take function for each byte width. +/// +/// This function assumes that the indices have been boundschecked. +template +struct PrimitiveTakeImpl { + static void Exec(const PrimitiveTakeArgs& args, Datum* out_datum) { + auto values = reinterpret_cast(args.values); + auto values_bitmap = args.values_bitmap; + auto values_offset = args.values_offset; + + auto indices = reinterpret_cast(args.indices); + auto indices_bitmap = args.indices_bitmap; + auto indices_offset = args.indices_offset; + + ArrayData* out_arr = out_datum->mutable_array(); + auto out = out_arr->GetMutableValues(1); + auto out_bitmap = out_arr->buffers[0]->mutable_data(); + auto out_offset = out_arr->offset; + + // If either the values or indices have nulls, we preemptively zero out the + // out validity bitmap so that we don't have to use ClearBit in each + // iteration for nulls. + if (args.values_null_count > 0 || args.indices_null_count > 0) { + BitUtil::SetBitsTo(out_bitmap, out_offset, args.indices_length, false); + } + + OptionalBitBlockCounter indices_bit_counter(indices_bitmap, indices_offset, + args.indices_length); + int64_t position = 0; + int64_t valid_count = 0; + while (true) { + BitBlockCount block = indices_bit_counter.NextBlock(); + if (block.length == 0) { + // All indices processed. + break; + } + if (args.values_null_count == 0) { + // Values are never null, so things are easier + valid_count += block.popcount; + if (block.popcount == block.length) { + // Fastest path: neither values nor index nulls + BitUtil::SetBitsTo(out_bitmap, out_offset + position, block.length, true); + for (int64_t i = 0; i < block.length; ++i) { + out[position] = values[indices[position]]; + ++position; + } + } else if (block.popcount > 0) { + // Slow path: some indices but not all are null + for (int64_t i = 0; i < block.length; ++i) { + if (BitUtil::GetBit(indices_bitmap, indices_offset + position)) { + // index is not null + BitUtil::SetBit(out_bitmap, out_offset + position); + out[position] = values[indices[position]]; + } + ++position; + } + } + } else { + // Values have nulls, so we must do random access into the values bitmap + if (block.popcount == block.length) { + // Faster path: indices are not null but values may be + for (int64_t i = 0; i < block.length; ++i) { + if (BitUtil::GetBit(values_bitmap, values_offset + indices[position])) { + // value is not null + out[position] = values[indices[position]]; + BitUtil::SetBit(out_bitmap, out_offset + position); + ++valid_count; + } + ++position; + } + } else if (block.popcount > 0) { + // Slow path: some but not all indices are null. Since we are doing + // random access in general we have to check the value nullness one by + // one. + for (int64_t i = 0; i < block.length; ++i) { + if (BitUtil::GetBit(indices_bitmap, indices_offset + position)) { + // index is not null + if (BitUtil::GetBit(values_bitmap, values_offset + indices[position])) { + // value is not null + out[position] = values[indices[position]]; + BitUtil::SetBit(out_bitmap, out_offset + position); + ++valid_count; + } + } + ++position; + } + } + } + } + out_arr->null_count = out_arr->length - valid_count; + } +}; + +template +struct BooleanTakeImpl { + static void Exec(const PrimitiveTakeArgs& args, Datum* out_datum) { + auto values = args.values; + auto values_bitmap = args.values_bitmap; + auto values_offset = args.values_offset; + + auto indices = reinterpret_cast(args.indices); + auto indices_bitmap = args.indices_bitmap; + auto indices_offset = args.indices_offset; + + ArrayData* out_arr = out_datum->mutable_array(); + auto out = out_arr->buffers[1]->mutable_data(); + auto out_bitmap = out_arr->buffers[0]->mutable_data(); + auto out_offset = out_arr->offset; - static void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { - ValueArrayType values(batch[0].array()); - IndexArrayType indices(batch[1].array()); - std::shared_ptr result; - KERNEL_RETURN_IF_ERROR(ctx, Select(ctx, values, IS(indices), &result)); - out->value = result->data(); + // If either the values or indices have nulls, we preemptively zero out the + // out validity bitmap so that we don't have to use ClearBit in each + // iteration for nulls. + if (args.values_null_count > 0 || args.indices_null_count > 0) { + BitUtil::SetBitsTo(out_bitmap, out_offset, args.indices_length, false); + } + + auto PlaceDataBit = [&](int64_t loc, IndexCType index) { + BitUtil::SetBitTo(out, out_offset + loc, + BitUtil::GetBit(values, values_offset + index)); + }; + + OptionalBitBlockCounter indices_bit_counter(indices_bitmap, indices_offset, + args.indices_length); + int64_t position = 0; + int64_t valid_count = 0; + while (true) { + BitBlockCount block = indices_bit_counter.NextBlock(); + if (block.length == 0) { + // All indices processed. + break; + } + if (args.values_null_count == 0) { + // Values are never null, so things are easier + valid_count += block.popcount; + if (block.popcount == block.length) { + // Fastest path: neither values nor index nulls + BitUtil::SetBitsTo(out_bitmap, out_offset + position, block.length, true); + for (int64_t i = 0; i < block.length; ++i) { + PlaceDataBit(position, indices[position]); + ++position; + } + } else if (block.popcount > 0) { + // Slow path: some but not all indices are null + for (int64_t i = 0; i < block.length; ++i) { + if (BitUtil::GetBit(indices_bitmap, indices_offset + position)) { + // index is not null + BitUtil::SetBit(out_bitmap, out_offset + position); + PlaceDataBit(position, indices[position]); + } + ++position; + } + } + } else { + // Values have nulls, so we must do random access into the values bitmap + if (block.popcount == block.length) { + // Faster path: indices are not null but values may be + for (int64_t i = 0; i < block.length; ++i) { + if (BitUtil::GetBit(values_bitmap, values_offset + indices[position])) { + // value is not null + BitUtil::SetBit(out_bitmap, out_offset + position); + PlaceDataBit(position, indices[position]); + ++valid_count; + } + ++position; + } + } else if (block.popcount > 0) { + // Slow path: some but not all indices are null. Since we are doing + // random access in general we have to check the value nullness one by + // one. + for (int64_t i = 0; i < block.length; ++i) { + if (BitUtil::GetBit(indices_bitmap, indices_offset + position)) { + // index is not null + if (BitUtil::GetBit(values_bitmap, values_offset + indices[position])) { + // value is not null + PlaceDataBit(position, indices[position]); + BitUtil::SetBit(out_bitmap, out_offset + position); + ++valid_count; + } + } + ++position; + } + } + } + } + out_arr->null_count = out_arr->length - valid_count; } }; -struct TakeKernelVisitor { - TakeKernelVisitor(const DataType& value_type, const DataType& index_type) - : value_type(value_type), index_type(index_type) {} +template