From 65dcd907589054ab8e2ce7dd47a8aa2f01573d0c Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Fri, 21 Jun 2019 20:51:03 -0400 Subject: [PATCH 01/18] refactor Take and Filter to share code through Taker<> --- cpp/src/arrow/array.cc | 13 +- cpp/src/arrow/array.h | 8 +- cpp/src/arrow/compute/kernels/filter.cc | 421 ++----------- cpp/src/arrow/compute/kernels/filter.h | 9 +- cpp/src/arrow/compute/kernels/take-internal.h | 570 ++++++++++++++++++ cpp/src/arrow/compute/kernels/take.cc | 224 ++----- cpp/src/arrow/compute/kernels/take.h | 20 +- cpp/src/arrow/compute/kernels/util-internal.h | 2 +- 8 files changed, 714 insertions(+), 553 deletions(-) create mode 100644 cpp/src/arrow/compute/kernels/take-internal.h diff --git a/cpp/src/arrow/array.cc b/cpp/src/arrow/array.cc index b41d6dda681..8fa2d76e6bf 100644 --- a/cpp/src/arrow/array.cc +++ b/cpp/src/arrow/array.cc @@ -297,12 +297,21 @@ MapArray::MapArray(const std::shared_ptr& data) { SetData(data); } MapArray::MapArray(const std::shared_ptr& type, int64_t length, const std::shared_ptr& offsets, - const std::shared_ptr& keys, const std::shared_ptr& values, const std::shared_ptr& null_bitmap, int64_t null_count, int64_t offset) { + SetData(ArrayData::Make(type, length, {null_bitmap, offsets}, {values->data()}, + null_count, offset)); +} + +MapArray::MapArray(const std::shared_ptr& type, int64_t length, + const std::shared_ptr& offsets, + const std::shared_ptr& keys, + const std::shared_ptr& items, + const std::shared_ptr& null_bitmap, int64_t null_count, + int64_t offset) { auto pair_data = ArrayData::Make(type->children()[0]->type(), keys->data()->length, - {nullptr}, {keys->data(), values->data()}, 0, offset); + {nullptr}, {keys->data(), items->data()}, 0, offset); auto map_data = ArrayData::Make(type, length, {null_bitmap, offsets}, {pair_data}, null_count, offset); SetData(map_data); diff --git a/cpp/src/arrow/array.h b/cpp/src/arrow/array.h index 5cca9db3c5c..1e163b7a3ad 100644 --- a/cpp/src/arrow/array.h +++ b/cpp/src/arrow/array.h @@ -565,7 +565,13 @@ class ARROW_EXPORT MapArray : public ListArray { MapArray(const std::shared_ptr& type, int64_t length, const std::shared_ptr& value_offsets, - const std::shared_ptr& keys, const std::shared_ptr& values, + const std::shared_ptr& keys, const std::shared_ptr& items, + const std::shared_ptr& null_bitmap = NULLPTR, + int64_t null_count = kUnknownNullCount, int64_t offset = 0); + + MapArray(const std::shared_ptr& type, int64_t length, + const std::shared_ptr& value_offsets, + const std::shared_ptr& values, const std::shared_ptr& null_bitmap = NULLPTR, int64_t null_count = kUnknownNullCount, int64_t offset = 0); diff --git a/cpp/src/arrow/compute/kernels/filter.cc b/cpp/src/arrow/compute/kernels/filter.cc index 654ec610352..965f4cbbf35 100644 --- a/cpp/src/arrow/compute/kernels/filter.cc +++ b/cpp/src/arrow/compute/kernels/filter.cc @@ -15,19 +15,16 @@ // specific language governing permissions and limitations // under the License. -#include +#include "arrow/compute/kernels/filter.h" + #include #include -#include #include "arrow/builder.h" #include "arrow/compute/context.h" -#include "arrow/compute/kernels/filter.h" -#include "arrow/util/bit-util.h" +#include "arrow/compute/kernels/take-internal.h" #include "arrow/util/checked_cast.h" #include "arrow/util/logging.h" -#include "arrow/util/stl.h" -#include "arrow/visitor_inline.h" namespace arrow { namespace compute { @@ -35,32 +32,33 @@ namespace compute { using internal::checked_cast; using internal::checked_pointer_cast; -template -Status MakeBuilder(MemoryPool* pool, const std::shared_ptr& type, - std::unique_ptr* out) { - std::unique_ptr builder; - RETURN_NOT_OK(MakeBuilder(pool, type, &builder)); - out->reset(checked_cast(builder.release())); - return Status::OK(); -} +class FilterIndexSequence { + public: + static constexpr int64_t take_null_index = std::numeric_limits::min(); + static constexpr bool never_out_of_bounds = true; -template -static Status UnsafeAppend(Builder* builder, Scalar&& value) { - builder->UnsafeAppend(std::forward(value)); - return Status::OK(); -} + FilterIndexSequence(const BooleanArray& filter, int64_t out_length) + : filter_(&filter), out_length_(out_length) {} -static Status UnsafeAppend(BinaryBuilder* builder, util::string_view value) { - RETURN_NOT_OK(builder->ReserveData(static_cast(value.size()))); - builder->UnsafeAppend(value); - return Status::OK(); -} + int64_t Next() { + while (filter_->IsValid(index_) && !filter_->Value(index_)) { + ++index_; + } + if (filter_->IsNull(index_)) { + ++index_; + return take_null_index; + } + return index_++; + } -static Status UnsafeAppend(StringBuilder* builder, util::string_view value) { - RETURN_NOT_OK(builder->ReserveData(static_cast(value.size()))); - builder->UnsafeAppend(value); - return Status::OK(); -} + int64_t length() const { return out_length_; } + + int64_t null_count() const { return filter_->null_count(); } + + private: + const BooleanArray* filter_; + int64_t index_ = 0, out_length_ = -1; +}; // TODO(bkietz) this can be optimized static int64_t OutputSize(const BooleanArray& filter) { @@ -75,358 +73,29 @@ static int64_t OutputSize(const BooleanArray& filter) { return size; } -template -class FilterImpl; - -template <> -class FilterImpl : public FilterKernel { - public: - using FilterKernel::FilterKernel; - - Status Filter(FunctionContext* ctx, const Array& values, const BooleanArray& filter, - int64_t length, std::shared_ptr* out) override { - out->reset(new NullArray(length)); - return Status::OK(); - } -}; - -template -class FilterImpl : public FilterKernel { - public: - using ValueArray = typename TypeTraits::ArrayType; - using OutBuilder = typename TypeTraits::BuilderType; - - using FilterKernel::FilterKernel; - - Status Filter(FunctionContext* ctx, const Array& values, const BooleanArray& filter, - int64_t length, std::shared_ptr* out) override { - std::unique_ptr builder; - RETURN_NOT_OK(MakeBuilder(ctx->memory_pool(), type_, &builder)); - RETURN_NOT_OK(builder->Resize(OutputSize(filter))); - RETURN_NOT_OK(UnpackValuesNullCount(checked_cast(values), filter, - builder.get())); - return builder->Finish(out); - } - - private: - Status UnpackValuesNullCount(const ValueArray& values, const BooleanArray& filter, - OutBuilder* builder) { - if (values.null_count() == 0) { - return UnpackIndicesNullCount(values, filter, builder); - } - return UnpackIndicesNullCount(values, filter, builder); - } - - template - Status UnpackIndicesNullCount(const ValueArray& values, const BooleanArray& filter, - OutBuilder* builder) { - if (filter.null_count() == 0) { - return Filter(values, filter, builder); - } - return Filter(values, filter, builder); - } - - template - Status Filter(const ValueArray& values, const BooleanArray& filter, - OutBuilder* builder) { - for (int64_t i = 0; i < filter.length(); ++i) { - if (!AllIndicesValid && filter.IsNull(i)) { - builder->UnsafeAppendNull(); - continue; - } - if (!filter.Value(i)) { - continue; - } - if (!AllValuesValid && values.IsNull(i)) { - builder->UnsafeAppendNull(); - continue; - } - RETURN_NOT_OK(UnsafeAppend(builder, values.GetView(i))); - } - return Status::OK(); - } -}; - -template <> -class FilterImpl : public FilterKernel { - public: - FilterImpl(const std::shared_ptr& type, - std::vector> child_kernels) - : FilterKernel(type), child_kernels_(std::move(child_kernels)) {} - - Status Filter(FunctionContext* ctx, const Array& values, const BooleanArray& filter, - int64_t length, std::shared_ptr* out) override { - const auto& struct_array = checked_cast(values); - - TypedBufferBuilder null_bitmap_builder(ctx->memory_pool()); - RETURN_NOT_OK(null_bitmap_builder.Resize(length)); - - ArrayVector fields(type_->num_children()); - for (int i = 0; i < type_->num_children(); ++i) { - RETURN_NOT_OK(child_kernels_[i]->Filter(ctx, *struct_array.field(i), filter, length, - &fields[i])); - } - - for (int64_t i = 0; i < filter.length(); ++i) { - if (filter.IsNull(i)) { - null_bitmap_builder.UnsafeAppend(false); - continue; - } - if (!filter.Value(i)) { - continue; - } - if (struct_array.IsNull(i)) { - null_bitmap_builder.UnsafeAppend(false); - continue; - } - null_bitmap_builder.UnsafeAppend(true); - } - - auto null_count = null_bitmap_builder.false_count(); - std::shared_ptr null_bitmap; - RETURN_NOT_OK(null_bitmap_builder.Finish(&null_bitmap)); - - out->reset(new StructArray(type_, length, fields, null_bitmap, null_count)); - return Status::OK(); - } - - private: - std::vector> child_kernels_; -}; - -template <> -class FilterImpl : public FilterKernel { - public: - using FilterKernel::FilterKernel; - - Status Filter(FunctionContext* ctx, const Array& values, const BooleanArray& filter, - int64_t length, std::shared_ptr* out) override { - const auto& list_array = checked_cast(values); - - TypedBufferBuilder null_bitmap_builder(ctx->memory_pool()); - RETURN_NOT_OK(null_bitmap_builder.Resize(length)); - - BooleanBuilder value_filter_builder(ctx->memory_pool()); - auto list_size = list_array.list_type()->list_size(); - RETURN_NOT_OK(value_filter_builder.Resize(list_size * length)); - - for (int64_t i = 0; i < filter.length(); ++i) { - if (filter.IsNull(i)) { - null_bitmap_builder.UnsafeAppend(false); - for (int64_t j = 0; j < list_size; ++j) { - value_filter_builder.UnsafeAppendNull(); - } - continue; - } - if (!filter.Value(i)) { - for (int64_t j = 0; j < list_size; ++j) { - value_filter_builder.UnsafeAppend(false); - } - continue; - } - if (values.IsNull(i)) { - null_bitmap_builder.UnsafeAppend(false); - for (int64_t j = 0; j < list_size; ++j) { - value_filter_builder.UnsafeAppendNull(); - } - continue; - } - for (int64_t j = 0; j < list_size; ++j) { - value_filter_builder.UnsafeAppend(true); - } - null_bitmap_builder.UnsafeAppend(true); - } - - std::shared_ptr value_filter; - RETURN_NOT_OK(value_filter_builder.Finish(&value_filter)); - std::shared_ptr out_values; - RETURN_NOT_OK( - arrow::compute::Filter(ctx, *list_array.values(), *value_filter, &out_values)); - - auto null_count = null_bitmap_builder.false_count(); - std::shared_ptr null_bitmap; - RETURN_NOT_OK(null_bitmap_builder.Finish(&null_bitmap)); - - out->reset( - new FixedSizeListArray(type_, length, out_values, null_bitmap, null_count)); - return Status::OK(); - } -}; - -template <> -class FilterImpl : public FilterKernel { - public: - using FilterKernel::FilterKernel; - - Status Filter(FunctionContext* ctx, const Array& values, const BooleanArray& filter, - int64_t length, std::shared_ptr* out) override { - const auto& list_array = checked_cast(values); - - TypedBufferBuilder null_bitmap_builder(ctx->memory_pool()); - RETURN_NOT_OK(null_bitmap_builder.Resize(length)); - - BooleanBuilder value_filter_builder(ctx->memory_pool()); - - TypedBufferBuilder offset_builder(ctx->memory_pool()); - RETURN_NOT_OK(offset_builder.Resize(length + 1)); - int32_t offset = 0; - offset_builder.UnsafeAppend(offset); - - for (int64_t i = 0; i < filter.length(); ++i) { - if (filter.IsNull(i)) { - null_bitmap_builder.UnsafeAppend(false); - offset_builder.UnsafeAppend(offset); - RETURN_NOT_OK( - value_filter_builder.AppendValues(list_array.value_length(i), false)); - continue; - } - if (!filter.Value(i)) { - RETURN_NOT_OK( - value_filter_builder.AppendValues(list_array.value_length(i), false)); - continue; - } - if (values.IsNull(i)) { - null_bitmap_builder.UnsafeAppend(false); - offset_builder.UnsafeAppend(offset); - RETURN_NOT_OK( - value_filter_builder.AppendValues(list_array.value_length(i), false)); - continue; - } - null_bitmap_builder.UnsafeAppend(true); - offset += list_array.value_length(i); - offset_builder.UnsafeAppend(offset); - RETURN_NOT_OK(value_filter_builder.AppendValues(list_array.value_length(i), true)); - } - - std::shared_ptr value_filter; - RETURN_NOT_OK(value_filter_builder.Finish(&value_filter)); - std::shared_ptr out_values; - RETURN_NOT_OK( - arrow::compute::Filter(ctx, *list_array.values(), *value_filter, &out_values)); - - auto null_count = null_bitmap_builder.false_count(); - std::shared_ptr offsets, null_bitmap; - RETURN_NOT_OK(offset_builder.Finish(&offsets)); - RETURN_NOT_OK(null_bitmap_builder.Finish(&null_bitmap)); - - *out = MakeArray(ArrayData::Make(type_, length, {null_bitmap, offsets}, - {out_values->data()}, null_count)); - return Status::OK(); - } -}; - -template <> -class FilterImpl : public FilterImpl { - using FilterImpl::FilterImpl; -}; - -template <> -class FilterImpl : public FilterKernel { - public: - FilterImpl(const std::shared_ptr& type, std::unique_ptr impl) - : FilterKernel(type), impl_(std::move(impl)) {} - - Status Filter(FunctionContext* ctx, const Array& values, const BooleanArray& filter, - int64_t length, std::shared_ptr* out) override { - auto dict_array = checked_cast(&values); - // To filter a dictionary, apply the current kernel to the dictionary's indices. - std::shared_ptr taken_indices; - RETURN_NOT_OK( - impl_->Filter(ctx, *dict_array->indices(), filter, length, &taken_indices)); - return DictionaryArray::FromArrays(values.type(), taken_indices, - dict_array->dictionary(), out); - } - - private: - std::unique_ptr impl_; -}; - -template <> -class FilterImpl : public FilterKernel { +class FilterKernelImpl : public FilterKernel { public: - FilterImpl(const std::shared_ptr& type, std::unique_ptr impl) - : FilterKernel(type), impl_(std::move(impl)) {} + FilterKernelImpl(const std::shared_ptr& type, + std::unique_ptr> taker) + : FilterKernel(type), taker_(std::move(taker)) {} Status Filter(FunctionContext* ctx, const Array& values, const BooleanArray& filter, int64_t length, std::shared_ptr* out) override { - auto ext_array = checked_cast(&values); - // To take from an extension array, apply the current kernel to storage. - std::shared_ptr taken_storage; - RETURN_NOT_OK( - impl_->Filter(ctx, *ext_array->storage(), filter, length, &taken_storage)); - *out = ext_array->extension_type()->MakeArray(taken_storage->data()); - return Status::OK(); + RETURN_NOT_OK(taker_->Init(ctx->memory_pool())); + RETURN_NOT_OK(taker_->Take(values, FilterIndexSequence(filter, length))); + return taker_->Finish(out); } - private: - std::unique_ptr impl_; + std::unique_ptr> taker_; }; Status FilterKernel::Make(const std::shared_ptr& value_type, std::unique_ptr* out) { - switch (value_type->id()) { -#define NO_CHILD_CASE(T) \ - case T##Type::type_id: \ - *out = internal::make_unique>(value_type); \ - return Status::OK() - -#define SINGLE_CHILD_CASE(T, CHILD_TYPE) \ - case T##Type::type_id: { \ - auto t = checked_pointer_cast(value_type); \ - std::unique_ptr child_filter_impl; \ - RETURN_NOT_OK(FilterKernel::Make(t->CHILD_TYPE(), &child_filter_impl)); \ - *out = internal::make_unique>(t, std::move(child_filter_impl)); \ - return Status::OK(); \ - } - - NO_CHILD_CASE(Null); - NO_CHILD_CASE(Boolean); - NO_CHILD_CASE(Int8); - NO_CHILD_CASE(Int16); - NO_CHILD_CASE(Int32); - NO_CHILD_CASE(Int64); - NO_CHILD_CASE(UInt8); - NO_CHILD_CASE(UInt16); - NO_CHILD_CASE(UInt32); - NO_CHILD_CASE(UInt64); - NO_CHILD_CASE(Date32); - NO_CHILD_CASE(Date64); - NO_CHILD_CASE(Time32); - NO_CHILD_CASE(Time64); - NO_CHILD_CASE(Timestamp); - NO_CHILD_CASE(Duration); - NO_CHILD_CASE(HalfFloat); - NO_CHILD_CASE(Float); - NO_CHILD_CASE(Double); - NO_CHILD_CASE(String); - NO_CHILD_CASE(Binary); - NO_CHILD_CASE(FixedSizeBinary); - NO_CHILD_CASE(Decimal128); + std::unique_ptr> taker; + RETURN_NOT_OK(Taker::Make(value_type, &taker)); - SINGLE_CHILD_CASE(Dictionary, index_type); - SINGLE_CHILD_CASE(Extension, storage_type); - - NO_CHILD_CASE(List); - NO_CHILD_CASE(FixedSizeList); - NO_CHILD_CASE(Map); - - case Type::STRUCT: { - std::vector> child_kernels; - for (auto child : value_type->children()) { - child_kernels.emplace_back(); - RETURN_NOT_OK(FilterKernel::Make(child->type(), &child_kernels.back())); - } - *out = internal::make_unique>(value_type, - std::move(child_kernels)); - return Status::OK(); - } - -#undef NO_CHILD_CASE -#undef SINGLE_CHILD_CASE - - default: - return Status::NotImplemented("gathering values of type ", *value_type); - } + out->reset(new FilterKernelImpl(value_type, std::move(taker))); + return Status::OK(); } Status FilterKernel::Call(FunctionContext* ctx, const Datum& values, const Datum& filter, @@ -436,26 +105,26 @@ Status FilterKernel::Call(FunctionContext* ctx, const Datum& values, const Datum } auto values_array = values.make_array(); auto filter_array = checked_pointer_cast(filter.make_array()); - const auto length = OutputSize(*filter_array); std::shared_ptr out_array; - RETURN_NOT_OK(this->Filter(ctx, *values_array, *filter_array, length, &out_array)); + RETURN_NOT_OK(this->Filter(ctx, *values_array, *filter_array, OutputSize(*filter_array), + &out_array)); *out = out_array; return Status::OK(); } -Status Filter(FunctionContext* context, const Array& values, const Array& filter, +Status Filter(FunctionContext* ctx, const Array& values, const Array& filter, std::shared_ptr* out) { Datum out_datum; - RETURN_NOT_OK(Filter(context, Datum(values.data()), Datum(filter.data()), &out_datum)); + RETURN_NOT_OK(Filter(ctx, Datum(values.data()), Datum(filter.data()), &out_datum)); *out = out_datum.make_array(); return Status::OK(); } -Status Filter(FunctionContext* context, const Datum& values, const Datum& filter, +Status Filter(FunctionContext* ctx, const Datum& values, const Datum& filter, Datum* out) { std::unique_ptr kernel; RETURN_NOT_OK(FilterKernel::Make(values.type(), &kernel)); - return kernel->Call(context, values, filter, out); + return kernel->Call(ctx, values, filter, out); } } // namespace compute diff --git a/cpp/src/arrow/compute/kernels/filter.h b/cpp/src/arrow/compute/kernels/filter.h index 46ad3d42b87..401daa8c806 100644 --- a/cpp/src/arrow/compute/kernels/filter.h +++ b/cpp/src/arrow/compute/kernels/filter.h @@ -41,23 +41,22 @@ class FunctionContext; /// filter = [0, 1, 1, 0, null, 1], the output will be /// = ["b", "c", null, "f"] /// -/// \param[in] context the FunctionContext +/// \param[in] ctx the FunctionContext /// \param[in] values array to filter /// \param[in] filter indicates which values should be filtered out /// \param[out] out resulting array ARROW_EXPORT -Status Filter(FunctionContext* context, const Array& values, const Array& filter, +Status Filter(FunctionContext* ctx, const Array& values, const Array& filter, std::shared_ptr* out); /// \brief Filter an array with a boolean selection filter /// -/// \param[in] context the FunctionContext +/// \param[in] ctx the FunctionContext /// \param[in] values datum to filter /// \param[in] filter indicates which values should be filtered out /// \param[out] out resulting datum ARROW_EXPORT -Status Filter(FunctionContext* context, const Datum& values, const Datum& filter, - Datum* out); +Status Filter(FunctionContext* ctx, const Datum& values, const Datum& filter, Datum* out); /// \brief BinaryKernel implementing Filter operation class ARROW_EXPORT FilterKernel : public BinaryKernel { diff --git a/cpp/src/arrow/compute/kernels/take-internal.h b/cpp/src/arrow/compute/kernels/take-internal.h new file mode 100644 index 00000000000..aec8ba5bb8b --- /dev/null +++ b/cpp/src/arrow/compute/kernels/take-internal.h @@ -0,0 +1,570 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include +#include + +#include "arrow/builder.h" +#include "arrow/compute/context.h" +#include "arrow/util/bit-util.h" +#include "arrow/util/checked_cast.h" +#include "arrow/util/logging.h" +#include "arrow/util/stl.h" +#include "arrow/visitor_inline.h" + +namespace arrow { +namespace compute { + +using internal::checked_cast; +using internal::checked_pointer_cast; + +template +static Status UnsafeAppend(Builder* builder, Scalar&& value) { + builder->UnsafeAppend(std::forward(value)); + return Status::OK(); +} + +static Status UnsafeAppend(BinaryBuilder* builder, util::string_view value) { + RETURN_NOT_OK(builder->ReserveData(static_cast(value.size()))); + builder->UnsafeAppend(value); + return Status::OK(); +} + +static Status UnsafeAppend(StringBuilder* builder, util::string_view value) { + RETURN_NOT_OK(builder->ReserveData(static_cast(value.size()))); + builder->UnsafeAppend(value); + return Status::OK(); +} + +template +class Taker { + public: + Taker(const std::shared_ptr& type) : type_(type) {} + + virtual ~Taker() = default; + + virtual Status MakeChildren() { return Status::OK(); } + + virtual Status Init(MemoryPool* pool) = 0; + + virtual Status Take(const Array& values, IndexSequence indices) = 0; + + virtual Status Finish(std::shared_ptr*) = 0; + + static Status Make(const std::shared_ptr& type, std::unique_ptr* out); + + static_assert(std::is_copy_constructible::value, + "Index sequences must be copy constructible"); + + static_assert( + IndexSequence::take_null_index == std::numeric_limits::min(), + "Index sequences must declare a taken element as null with index == LONG_MIN"); + + static_assert( + std::is_same::value, + "Index sequences must declare whether bounds checking is necessary"); + + static_assert( + std::is_same().Next()), int64_t>::value, + "An index sequence must yield indices of type int64_t."); + + static_assert(std::is_same().length()), + int64_t>::value, + "An index sequence must provide its length."); + + static_assert(std::is_same().null_count()), + int64_t>::value, + "An index sequence must provide the number of nulls it will take."); + + protected: + Status BoundsCheck(const Array& values, int64_t index) { + if (IndexSequence::never_out_of_bounds) { + return Status::OK(); + } + if (index < 0 || index >= values.length()) { + return Status::IndexError("take index out of bounds"); + } + return Status::OK(); + } + + template + Status MakeBuilder(MemoryPool* pool, std::unique_ptr* out) { + std::unique_ptr builder; + RETURN_NOT_OK(arrow::MakeBuilder(pool, type_, &builder)); + out->reset(checked_cast(builder.release())); + return Status::OK(); + } + + std::shared_ptr type_; +}; + +class RangeIndexSequence { + public: + static constexpr int64_t take_null_index = std::numeric_limits::min(); + static constexpr bool never_out_of_bounds = true; + + RangeIndexSequence(int64_t offset, int64_t length) : index_(offset), length_(length) {} + + int64_t Next() { return index_++; } + + int64_t length() const { return length_; } + + int64_t null_count() const { return 0; } + + private: + int64_t index_ = 0, length_ = -1; +}; + +class RangeOrNullIndexSequence { + public: + static constexpr int64_t take_null_index = std::numeric_limits::min(); + static constexpr bool never_out_of_bounds = true; + + RangeOrNullIndexSequence(int64_t offset_or_null, int64_t length) + : index_(offset_or_null), length_(length) {} + + int64_t Next() { + if (index_ == take_null_index) { + return take_null_index; + } + return index_++; + } + + int64_t length() const { return length_; } + + int64_t null_count() const { return index_ == take_null_index ? length_ : 0; } + + private: + int64_t index_ = 0, length_ = -1; +}; + +template +class TakerImpl; + +template +class TakerImpl : public Taker { + public: + using Taker::Taker; + + Status Init(MemoryPool*) override { return Status::OK(); } + + Status Take(const Array& values, IndexSequence indices) override { + DCHECK(this->type_->Equals(values.type())); + + if (!IndexSequence::never_out_of_bounds) { + for (int64_t i = 0; i < indices.length(); ++i) { + int64_t index = indices.Next(); + if (index == IndexSequence::take_null_index) { + continue; + } + RETURN_NOT_OK(this->BoundsCheck(values, index)); + } + } + + length_ += indices.length(); + return Status::OK(); + } + + Status Finish(std::shared_ptr* out) override { + out->reset(new NullArray(length_)); + return Status::OK(); + } + + private: + int64_t length_ = 0; +}; + +template +class TakerImpl : public Taker { + public: + using ArrayType = typename TypeTraits::ArrayType; + using BuilderType = typename TypeTraits::BuilderType; + + using Taker::Taker; + + Status Init(MemoryPool* pool) override { return this->MakeBuilder(pool, &builder_); } + + Status Take(const Array& values, IndexSequence indices) override { + DCHECK(this->type_->Equals(values.type())); + RETURN_NOT_OK(builder_->Reserve(indices.length())); + + if (indices.null_count() == 0) { + if (values.null_count() == 0) { + return Take(values, indices); + } else { + return Take(values, indices); + } + } else { + if (values.null_count() == 0) { + return Take(values, indices); + } else { + return Take(values, indices); + } + } + } + + Status Finish(std::shared_ptr* out) override { return builder_->Finish(out); } + + private: + template + Status Take(const Array& values, IndexSequence indices) { + for (int64_t i = 0; i < indices.length(); ++i) { + int64_t index = indices.Next(); + + if (SomeIndicesNull && index == IndexSequence::take_null_index) { + builder_->UnsafeAppendNull(); + continue; + } + + if (SomeValuesNull && values.IsNull(index)) { + builder_->UnsafeAppendNull(); + continue; + } + + RETURN_NOT_OK(this->BoundsCheck(values, index)); + auto value = checked_cast(values).GetView(index); + RETURN_NOT_OK(UnsafeAppend(builder_.get(), value)); + } + return Status::OK(); + } + + std::unique_ptr builder_; +}; + +template +class TakerImpl : public Taker { + public: + using Taker::Taker; + + Status MakeChildren() override { + const auto& list_type = checked_cast(*this->type_); + return Taker::Make(list_type.value_type(), &value_taker_); + } + + Status Init(MemoryPool* pool) override { + null_bitmap_builder_.reset(new TypedBufferBuilder(pool)); + offset_builder_.reset(new TypedBufferBuilder(pool)); + return value_taker_->Init(pool); + } + + Status Take(const Array& values, IndexSequence indices) override { + DCHECK(this->type_->Equals(values.type())); + + const auto& list_array = checked_cast(values); + + RETURN_NOT_OK(null_bitmap_builder_->Reserve(indices.length())); + RETURN_NOT_OK(offset_builder_->Reserve(indices.length() + 1)); + int32_t offset = 0; + offset_builder_->UnsafeAppend(offset); + + for (int64_t i = 0; i < indices.length(); ++i) { + int64_t index = indices.Next(); + + bool is_valid = index != IndexSequence::take_null_index && values.IsValid(index); + null_bitmap_builder_->UnsafeAppend(is_valid); + + if (is_valid) { + RETURN_NOT_OK(this->BoundsCheck(values, index)); + offset += list_array.value_length(index); + RangeIndexSequence value_indices(list_array.value_offset(index), + list_array.value_length(index)); + RETURN_NOT_OK(value_taker_->Take(*list_array.values(), value_indices)); + } + offset_builder_->UnsafeAppend(offset); + } + return Status::OK(); + } + + Status Finish(std::shared_ptr* out) override { return FinishAs(out); } + + protected: + template + Status FinishAs(std::shared_ptr* out) { + auto null_count = null_bitmap_builder_->false_count(); + auto length = null_bitmap_builder_->length(); + + std::shared_ptr offsets, null_bitmap; + RETURN_NOT_OK(null_bitmap_builder_->Finish(&null_bitmap)); + RETURN_NOT_OK(offset_builder_->Finish(&offsets)); + + std::shared_ptr taken_values; + RETURN_NOT_OK(value_taker_->Finish(&taken_values)); + + out->reset( + new T(this->type_, length, offsets, taken_values, null_bitmap, null_count)); + return Status::OK(); + } + + std::unique_ptr> null_bitmap_builder_; + std::unique_ptr> offset_builder_; + std::unique_ptr> value_taker_; +}; + +template +class TakerImpl : public TakerImpl { + public: + using TakerImpl::TakerImpl; + + Status Finish(std::shared_ptr* out) override { + return this->template FinishAs(out); + } +}; + +template +class TakerImpl : public Taker { + public: + using Taker::Taker; + + Status MakeChildren() override { + const auto& list_type = checked_cast(*this->type_); + return Taker::Make(list_type.value_type(), &value_taker_); + } + + Status Init(MemoryPool* pool) override { + null_bitmap_builder_.reset(new TypedBufferBuilder(pool)); + return value_taker_->Init(pool); + } + + Status Take(const Array& values, IndexSequence indices) override { + DCHECK(this->type_->Equals(values.type())); + + const auto& list_array = checked_cast(values); + auto list_size = list_array.list_type()->list_size(); + + RETURN_NOT_OK(null_bitmap_builder_->Reserve(indices.length())); + + for (int64_t i = 0; i < indices.length(); ++i) { + int64_t index = indices.Next(); + + bool is_valid = index != IndexSequence::take_null_index && values.IsValid(index); + null_bitmap_builder_->UnsafeAppend(is_valid); + + if (is_valid) { + RETURN_NOT_OK(this->BoundsCheck(values, index)); + } + RangeOrNullIndexSequence value_indices( + is_valid ? list_array.value_offset(index) : IndexSequence::take_null_index, + list_size); + RETURN_NOT_OK(value_taker_->Take(*list_array.values(), value_indices)); + } + return Status::OK(); + } + + Status Finish(std::shared_ptr* out) override { + auto null_count = null_bitmap_builder_->false_count(); + auto length = null_bitmap_builder_->length(); + + std::shared_ptr null_bitmap; + RETURN_NOT_OK(null_bitmap_builder_->Finish(&null_bitmap)); + + std::shared_ptr taken_values; + RETURN_NOT_OK(value_taker_->Finish(&taken_values)); + + out->reset(new FixedSizeListArray(this->type_, length, taken_values, null_bitmap, + null_count)); + return Status::OK(); + } + + protected: + std::unique_ptr> null_bitmap_builder_; + std::unique_ptr> value_taker_; +}; + +template +class TakerImpl : public Taker { + public: + using Taker::Taker; + + Status MakeChildren() override { + children_.resize(this->type_->num_children()); + for (int i = 0; i < this->type_->num_children(); ++i) { + RETURN_NOT_OK( + Taker::Make(this->type_->child(i)->type(), &children_[i])); + } + return Status::OK(); + } + + Status Init(MemoryPool* pool) override { + null_bitmap_builder_.reset(new TypedBufferBuilder(pool)); + for (int i = 0; i < this->type_->num_children(); ++i) { + RETURN_NOT_OK(children_[i]->Init(pool)); + } + return Status::OK(); + } + + Status Take(const Array& values, IndexSequence indices) override { + DCHECK(this->type_->Equals(values.type())); + + const auto& struct_array = checked_cast(values); + for (int i = 0; i < this->type_->num_children(); ++i) { + RETURN_NOT_OK(children_[i]->Take(*struct_array.field(i), indices)); + } + // TODO(bkietz) each child is doing bounds checking; this only needs to happen once + + RETURN_NOT_OK(null_bitmap_builder_->Reserve(indices.length())); + for (int64_t i = 0; i < indices.length(); ++i) { + int64_t index = indices.Next(); + null_bitmap_builder_->UnsafeAppend(index != IndexSequence::take_null_index && + values.IsValid(index)); + } + return Status::OK(); + } + + Status Finish(std::shared_ptr* out) override { + auto null_count = null_bitmap_builder_->false_count(); + auto length = null_bitmap_builder_->length(); + std::shared_ptr null_bitmap; + RETURN_NOT_OK(null_bitmap_builder_->Finish(&null_bitmap)); + + ArrayVector fields(this->type_->num_children()); + for (int i = 0; i < this->type_->num_children(); ++i) { + RETURN_NOT_OK(children_[i]->Finish(&fields[i])); + } + + out->reset( + new StructArray(this->type_, length, std::move(fields), null_bitmap, null_count)); + return Status::OK(); + } + + protected: + std::unique_ptr> null_bitmap_builder_; + std::vector>> children_; +}; + +template +class TakerImpl : public Taker { + public: + using Taker::Taker; + + Status MakeChildren() override { + const auto& dict_type = checked_cast(*this->type_); + return Taker::Make(dict_type.index_type(), &index_taker_); + } + + Status Init(MemoryPool* pool) override { + dictionary_ = nullptr; + return index_taker_->Init(pool); + } + + Status Take(const Array& values, IndexSequence indices) override { + DCHECK(this->type_->Equals(values.type())); + const auto& dict_array = checked_cast(values); + + if (dictionary_ != nullptr && dictionary_ != dict_array.dictionary()) { + return Status::NotImplemented( + "taking from DictionaryArrays with different dictionaries"); + } else { + dictionary_ = dict_array.dictionary(); + } + return index_taker_->Take(*dict_array.indices(), indices); + } + + Status Finish(std::shared_ptr* out) override { + std::shared_ptr taken_indices; + RETURN_NOT_OK(index_taker_->Finish(&taken_indices)); + out->reset(new DictionaryArray(this->type_, taken_indices, dictionary_)); + return Status::OK(); + } + + protected: + std::shared_ptr dictionary_; + std::unique_ptr> index_taker_; +}; + +template +class TakerImpl : public Taker { + public: + using Taker::Taker; + + Status MakeChildren() override { + const auto& ext_type = checked_cast(*this->type_); + return Taker::Make(ext_type.storage_type(), &storage_taker_); + } + + Status Init(MemoryPool* pool) override { return storage_taker_->Init(pool); } + + Status Take(const Array& values, IndexSequence indices) override { + DCHECK(this->type_->Equals(values.type())); + const auto& ext_array = checked_cast(values); + return storage_taker_->Take(*ext_array.storage(), indices); + } + + Status Finish(std::shared_ptr* out) override { + std::shared_ptr taken_storage; + RETURN_NOT_OK(storage_taker_->Finish(&taken_storage)); + out->reset(new ExtensionArray(this->type_, taken_storage)); + return Status::OK(); + } + + protected: + std::unique_ptr> storage_taker_; +}; + +template +struct TakerMakeImpl { + Status Visit(const NullType&) { return Make(); } + + template + typename std::enable_if::value, Status>::type + Visit(const Fixed&) { + return Make(); + } + + Status Visit(const BinaryType&) { return Make(); } + + Status Visit(const StringType&) { return Make(); } + + Status Visit(const ListType&) { return Make(); } + + Status Visit(const MapType&) { return Make(); } + + Status Visit(const FixedSizeListType&) { return Make(); } + + Status Visit(const StructType& t) { return Make(); } + + Status Visit(const DictionaryType& t) { return Make(); } + + Status Visit(const ExtensionType& t) { return Make(); } + + Status Visit(const DataType& t) { + return Status::NotImplemented("gathering values of type ", t); + } + + template + Status Make() { + out_->reset(new TakerImpl(type_)); + return (*out_)->MakeChildren(); + } + + std::shared_ptr type_; + std::unique_ptr>* out_; +}; + +template +Status Taker::Make(const std::shared_ptr& type, + std::unique_ptr* out) { + TakerMakeImpl visitor{type, out}; + return VisitTypeInline(*type, &visitor); +} + +} // namespace compute +} // namespace arrow + diff --git a/cpp/src/arrow/compute/kernels/take.cc b/cpp/src/arrow/compute/kernels/take.cc index 17b054099ea..c85ee6edab8 100644 --- a/cpp/src/arrow/compute/kernels/take.cc +++ b/cpp/src/arrow/compute/kernels/take.cc @@ -18,8 +18,8 @@ #include #include -#include "arrow/builder.h" #include "arrow/compute/context.h" +#include "arrow/compute/kernels/take-internal.h" #include "arrow/compute/kernels/take.h" #include "arrow/util/checked_cast.h" #include "arrow/util/logging.h" @@ -30,200 +30,102 @@ namespace compute { using internal::checked_cast; -Status Take(FunctionContext* context, const Array& values, const Array& indices, - const TakeOptions& options, std::shared_ptr* out) { - Datum out_datum; - RETURN_NOT_OK( - Take(context, Datum(values.data()), Datum(indices.data()), options, &out_datum)); - *out = out_datum.make_array(); - return Status::OK(); -} - -Status Take(FunctionContext* context, const Datum& values, const Datum& indices, - const TakeOptions& options, Datum* out) { - TakeKernel kernel(values.type(), options); - RETURN_NOT_OK(kernel.Call(context, values, indices, out)); - return Status::OK(); -} - -struct TakeParameters { - FunctionContext* context; - std::shared_ptr values, indices; - TakeOptions options; - std::shared_ptr* out; -}; - -template -Status UnsafeAppend(Builder* builder, Scalar&& value) { - builder->UnsafeAppend(std::forward(value)); - return Status::OK(); -} - -Status UnsafeAppend(BinaryBuilder* builder, util::string_view value) { - RETURN_NOT_OK(builder->ReserveData(static_cast(value.size()))); - builder->UnsafeAppend(value); - return Status::OK(); -} - -Status UnsafeAppend(StringBuilder* builder, util::string_view value) { - RETURN_NOT_OK(builder->ReserveData(static_cast(value.size()))); - builder->UnsafeAppend(value); - return Status::OK(); -} - -template -Status TakeImpl(FunctionContext*, const ValueArray& values, const IndexArray& indices, - OutBuilder* builder) { - auto raw_indices = indices.raw_values(); - for (int64_t i = 0; i < indices.length(); ++i) { - if (!AllIndicesValid && indices.IsNull(i)) { - builder->UnsafeAppendNull(); - continue; - } - auto index = static_cast(raw_indices[i]); - if (index < 0 || index >= values.length()) { - return Status::IndexError("take index out of bounds"); - } - if (!AllValuesValid && values.IsNull(index)) { - builder->UnsafeAppendNull(); - continue; - } - RETURN_NOT_OK(UnsafeAppend(builder, values.GetView(index))); - } - return Status::OK(); -} - -template -Status UnpackIndicesNullCount(FunctionContext* context, const ValueArray& values, - const IndexArray& indices, OutBuilder* builder) { - if (indices.null_count() == 0) { - return TakeImpl(context, values, indices, builder); - } - return TakeImpl(context, values, indices, builder); -} - -template -Status UnpackValuesNullCount(FunctionContext* context, const ValueArray& values, - const IndexArray& indices, OutBuilder* builder) { - if (values.null_count() == 0) { - return UnpackIndicesNullCount(context, values, indices, builder); - } - return UnpackIndicesNullCount(context, values, indices, builder); -} - template -struct UnpackValues { - using IndexArrayRef = const typename TypeTraits::ArrayType&; - - template - Status Visit(const ValueType&) { - using ValueArrayRef = const typename TypeTraits::ArrayType&; - using OutBuilder = typename TypeTraits::BuilderType; - IndexArrayRef indices = checked_cast(*params_.indices); - ValueArrayRef values = checked_cast(*params_.values); - std::unique_ptr builder; - RETURN_NOT_OK(MakeBuilder(params_.context->memory_pool(), values.type(), &builder)); - RETURN_NOT_OK(builder->Reserve(indices.length())); - RETURN_NOT_OK(UnpackValuesNullCount(params_.context, values, indices, - checked_cast(builder.get()))); - return builder->Finish(params_.out); - } - - Status Visit(const NullType& t) { - auto indices_length = params_.indices->length(); - if (indices_length != 0) { - auto indices = checked_cast(*params_.indices).raw_values(); - auto minmax = std::minmax_element(indices, indices + indices_length); - auto min = static_cast(*minmax.first); - auto max = static_cast(*minmax.second); - if (min < 0 || max >= params_.values->length()) { - return Status::IndexError("take index out of bounds"); - } +class ArrayIndexSequence { + public: + static constexpr int64_t take_null_index = std::numeric_limits::min(); + static constexpr bool never_out_of_bounds = false; + + ArrayIndexSequence(const Array& indices) + : indices_(&checked_cast&>(indices)) {} + + int64_t Next() { + if (indices_->IsNull(index_)) { + ++index_; + return take_null_index; } - params_.out->reset(new NullArray(indices_length)); - return Status::OK(); + return static_cast(indices_->Value(index_++)); } - Status Visit(const DictionaryType& t) { - std::shared_ptr taken_indices; - const auto& values = internal::checked_cast(*params_.values); - { - // To take from a dictionary, apply the current kernel to the dictionary's - // indices. (Use UnpackValues since IndexType is already unpacked) - auto indices = values.indices(); - TakeParameters params = params_; - params.values = indices; - params.out = &taken_indices; - UnpackValues unpack = {params}; - RETURN_NOT_OK(VisitTypeInline(*t.index_type(), &unpack)); - } - // create output dictionary from taken indices - *params_.out = std::make_shared(values.type(), taken_indices, - values.dictionary()); - return Status::OK(); - } + int64_t length() const { return indices_->length(); } - Status Visit(const ExtensionType& t) { - // XXX can we just take from its storage? - return Status::NotImplemented("gathering values of type ", t); - } + int64_t null_count() const { return indices_->null_count(); } - Status Visit(const UnionType& t) { - return Status::NotImplemented("gathering values of type ", t); - } - - Status Visit(const ListType& t) { - return Status::NotImplemented("gathering values of type ", t); - } + private: + const NumericArray* indices_; + int64_t index_ = 0; +}; - Status Visit(const MapType& t) { - return Status::NotImplemented("gathering values of type ", t); - } +template +class TakeKernelImpl : public TakeKernel { + public: + TakeKernelImpl(const std::shared_ptr& value_type) : TakeKernel(value_type) {} - Status Visit(const FixedSizeListType& t) { - return Status::NotImplemented("gathering values of type ", t); + Status Init() { + return Taker>::Make(this->type_, &taker_); } - Status Visit(const StructType& t) { - return Status::NotImplemented("gathering values of type ", t); + Status Take(FunctionContext* ctx, const Array& values, const Array& indices_array, + std::shared_ptr* out) override { + RETURN_NOT_OK(taker_->Init(ctx->memory_pool())); + RETURN_NOT_OK(taker_->Take(values, ArrayIndexSequence(indices_array))); + return taker_->Finish(out); } - const TakeParameters& params_; + std::unique_ptr>> taker_; }; struct UnpackIndices { template enable_if_integer Visit(const IndexType&) { - UnpackValues unpack = {params_}; - return VisitTypeInline(*params_.values->type(), &unpack); + auto out = new TakeKernelImpl(value_type_); + out_->reset(out); + return out->Init(); } Status Visit(const DataType& other) { return Status::TypeError("index type not supported: ", other); } - const TakeParameters& params_; + std::shared_ptr value_type_; + std::unique_ptr* out_; }; +Status TakeKernel::Make(const std::shared_ptr& value_type, + const std::shared_ptr& index_type, + std::unique_ptr* out) { + UnpackIndices visitor{value_type, out}; + return VisitTypeInline(*index_type, &visitor); +} + Status TakeKernel::Call(FunctionContext* ctx, const Datum& values, const Datum& indices, Datum* out) { if (!values.is_array() || !indices.is_array()) { return Status::Invalid("TakeKernel expects array values and indices"); } + auto values_array = values.make_array(); + auto indices_array = indices.make_array(); std::shared_ptr out_array; - TakeParameters params; - params.context = ctx; - params.values = values.make_array(); - params.indices = indices.make_array(); - params.options = options_; - params.out = &out_array; - UnpackIndices unpack = {params}; - RETURN_NOT_OK(VisitTypeInline(*indices.type(), &unpack)); + RETURN_NOT_OK(Take(ctx, *values_array, *indices_array, &out_array)); *out = Datum(out_array); return Status::OK(); } +Status Take(FunctionContext* ctx, const Array& values, const Array& indices, + const TakeOptions& options, std::shared_ptr* out) { + Datum out_datum; + RETURN_NOT_OK( + Take(ctx, Datum(values.data()), Datum(indices.data()), options, &out_datum)); + *out = out_datum.make_array(); + return Status::OK(); +} + +Status Take(FunctionContext* ctx, const Datum& values, const Datum& indices, + const TakeOptions& options, Datum* out) { + std::unique_ptr kernel; + RETURN_NOT_OK(TakeKernel::Make(values.type(), indices.type(), &kernel)); + return kernel->Call(ctx, values, indices, out); +} + } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/kernels/take.h b/cpp/src/arrow/compute/kernels/take.h index 3aa5ed5eedf..d0d08513f19 100644 --- a/cpp/src/arrow/compute/kernels/take.h +++ b/cpp/src/arrow/compute/kernels/take.h @@ -44,40 +44,46 @@ struct ARROW_EXPORT TakeOptions {}; /// = [values[2], values[1], null, values[3]] /// = ["c", "b", null, null] /// -/// \param[in] context the FunctionContext +/// \param[in] ctx the FunctionContext /// \param[in] values array from which to take /// \param[in] indices which values to take /// \param[in] options options /// \param[out] out resulting array ARROW_EXPORT -Status Take(FunctionContext* context, const Array& values, const Array& indices, +Status Take(FunctionContext* ctx, const Array& values, const Array& indices, const TakeOptions& options, std::shared_ptr* out); /// \brief Take from an array of values at indices in another array /// -/// \param[in] context the FunctionContext +/// \param[in] ctx the FunctionContext /// \param[in] values datum from which to take /// \param[in] indices which values to take /// \param[in] options options /// \param[out] out resulting datum ARROW_EXPORT -Status Take(FunctionContext* context, const Datum& values, const Datum& indices, +Status Take(FunctionContext* ctx, const Datum& values, const Datum& indices, const TakeOptions& options, Datum* out); /// \brief BinaryKernel implementing Take operation class ARROW_EXPORT TakeKernel : public BinaryKernel { public: explicit TakeKernel(const std::shared_ptr& type, TakeOptions options = {}) - : type_(type), options_(options) {} + : type_(type) {} Status Call(FunctionContext* ctx, const Datum& values, const Datum& indices, Datum* out) override; std::shared_ptr out_type() const override { return type_; } - private: + static Status Make(const std::shared_ptr& value_type, + const std::shared_ptr& index_type, + std::unique_ptr* out); + + virtual Status Take(FunctionContext* ctx, const Array& values, const Array& indices, + std::shared_ptr* out) = 0; + + protected: std::shared_ptr type_; - TakeOptions options_; }; } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/kernels/util-internal.h b/cpp/src/arrow/compute/kernels/util-internal.h index efd990f128c..c8325833d9f 100644 --- a/cpp/src/arrow/compute/kernels/util-internal.h +++ b/cpp/src/arrow/compute/kernels/util-internal.h @@ -131,7 +131,7 @@ class ARROW_EXPORT PrimitiveAllocatingUnaryKernel : public UnaryKernel { /// \brief Kernel used to preallocate outputs for primitive types. class ARROW_EXPORT PrimitiveAllocatingBinaryKernel : public BinaryKernel { public: - // \brief Construct with a kernel to delegate operatoions to. + // \brief Construct with a kernel to delegate operations to. // // Ownership is not taken of the delegate kernel, it must outlive // the life time of this object. From 227ea55167c53dd2409113589a8fad4f0be1be74 Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Sat, 22 Jun 2019 13:52:55 -0400 Subject: [PATCH 02/18] add tests for Take(nested types) --- cpp/src/arrow/compute/kernels/filter-test.cc | 10 +- cpp/src/arrow/compute/kernels/take-test.cc | 223 +++++++++++++++---- 2 files changed, 183 insertions(+), 50 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/filter-test.cc b/cpp/src/arrow/compute/kernels/filter-test.cc index 7b349492b1d..f88b23b959c 100644 --- a/cpp/src/arrow/compute/kernels/filter-test.cc +++ b/cpp/src/arrow/compute/kernels/filter-test.cc @@ -34,6 +34,8 @@ namespace compute { using internal::checked_pointer_cast; using util::string_view; +constexpr auto kSeed = 0x0ff1ce; + template class TestFilterKernel : public ComputeFixture, public TestBase { protected: @@ -138,7 +140,7 @@ TYPED_TEST(TestFilterKernelWithNumeric, FilterNumeric) { } TYPED_TEST(TestFilterKernelWithNumeric, FilterRandomNumeric) { - auto rand = random::RandomArrayGenerator(0x5416447); + auto rand = random::RandomArrayGenerator(kSeed); for (size_t i = 3; i < 13; i++) { const int64_t length = static_cast(1ULL << i); for (auto null_probability : {0.0, 0.01, 0.1, 0.25, 0.5, 1.0}) { @@ -191,7 +193,7 @@ TYPED_TEST(TestFilterKernelWithNumeric, CompareScalarAndFilterRandomNumeric) { using ArrayType = typename TypeTraits::ArrayType; using CType = typename TypeTraits::CType; - auto rand = random::RandomArrayGenerator(0x5416447); + auto rand = random::RandomArrayGenerator(kSeed); for (size_t i = 3; i < 13; i++) { const int64_t length = static_cast(1ULL << i); // TODO(bkietz) rewrite with some nulls @@ -216,7 +218,7 @@ TYPED_TEST(TestFilterKernelWithNumeric, CompareScalarAndFilterRandomNumeric) { TYPED_TEST(TestFilterKernelWithNumeric, CompareArrayAndFilterRandomNumeric) { using ArrayType = typename TypeTraits::ArrayType; - auto rand = random::RandomArrayGenerator(0x5416447); + auto rand = random::RandomArrayGenerator(kSeed); for (size_t i = 3; i < 13; i++) { const int64_t length = static_cast(1ULL << i); auto lhs = @@ -242,7 +244,7 @@ TYPED_TEST(TestFilterKernelWithNumeric, ScalarInRangeAndFilterRandomNumeric) { using ArrayType = typename TypeTraits::ArrayType; using CType = typename TypeTraits::CType; - auto rand = random::RandomArrayGenerator(0x5416447); + auto rand = random::RandomArrayGenerator(kSeed); for (size_t i = 3; i < 13; i++) { const int64_t length = static_cast(1ULL << i); auto array = diff --git a/cpp/src/arrow/compute/kernels/take-test.cc b/cpp/src/arrow/compute/kernels/take-test.cc index c61aedab5b9..8d5d1fcc665 100644 --- a/cpp/src/arrow/compute/kernels/take-test.cc +++ b/cpp/src/arrow/compute/kernels/take-test.cc @@ -29,31 +29,35 @@ namespace arrow { namespace compute { +using internal::checked_pointer_cast; using util::string_view; +constexpr auto kSeed = 0x0ff1ce; + template class TestTakeKernel : public ComputeFixture, public TestBase { protected: void AssertTakeArrays(const std::shared_ptr& values, - const std::shared_ptr& indices, TakeOptions options, + const std::shared_ptr& indices, const std::shared_ptr& expected) { std::shared_ptr actual; + TakeOptions options; ASSERT_OK(arrow::compute::Take(&this->ctx_, *values, *indices, options, &actual)); AssertArraysEqual(*expected, *actual); } void AssertTake(const std::shared_ptr& type, const std::string& values, - const std::string& indices, TakeOptions options, - const std::string& expected) { + const std::string& indices, const std::string& expected) { std::shared_ptr actual; for (auto index_type : {int8(), uint32()}) { - ASSERT_OK(this->Take(type, values, index_type, indices, options, &actual)); + ASSERT_OK(this->Take(type, values, index_type, indices, &actual)); AssertArraysEqual(*ArrayFromJSON(type, expected), *actual); } } Status Take(const std::shared_ptr& type, const std::string& values, const std::shared_ptr& index_type, const std::string& indices, - TakeOptions options, std::shared_ptr* out) { + std::shared_ptr* out) { + TakeOptions options; return arrow::compute::Take(&this->ctx_, *ArrayFromJSON(type, values), *ArrayFromJSON(index_type, indices), options, out); } @@ -62,82 +66,110 @@ class TestTakeKernel : public ComputeFixture, public TestBase { class TestTakeKernelWithNull : public TestTakeKernel { protected: void AssertTake(const std::string& values, const std::string& indices, - TakeOptions options, const std::string& expected) { - TestTakeKernel::AssertTake(utf8(), values, indices, options, expected); + const std::string& expected) { + TestTakeKernel::AssertTake(utf8(), values, indices, expected); } }; TEST_F(TestTakeKernelWithNull, TakeNull) { - TakeOptions options; - this->AssertTake("[null, null, null]", "[0, 1, 0]", options, "[null, null, null]"); + this->AssertTake("[null, null, null]", "[0, 1, 0]", "[null, null, null]"); std::shared_ptr arr; - ASSERT_RAISES(IndexError, this->Take(null(), "[null, null, null]", int8(), "[0, 9, 0]", - options, &arr)); + ASSERT_RAISES(IndexError, + this->Take(null(), "[null, null, null]", int8(), "[0, 9, 0]", &arr)); } TEST_F(TestTakeKernelWithNull, InvalidIndexType) { - TakeOptions options; std::shared_ptr arr; ASSERT_RAISES(TypeError, this->Take(null(), "[null, null, null]", float32(), - "[0.0, 1.0, 0.1]", options, &arr)); + "[0.0, 1.0, 0.1]", &arr)); } class TestTakeKernelWithBoolean : public TestTakeKernel { protected: void AssertTake(const std::string& values, const std::string& indices, - TakeOptions options, const std::string& expected) { - TestTakeKernel::AssertTake(boolean(), values, indices, options, - expected); + const std::string& expected) { + TestTakeKernel::AssertTake(boolean(), values, indices, expected); } }; TEST_F(TestTakeKernelWithBoolean, TakeBoolean) { - TakeOptions options; - this->AssertTake("[true, false, true]", "[0, 1, 0]", options, "[true, false, true]"); - this->AssertTake("[null, false, true]", "[0, 1, 0]", options, "[null, false, null]"); - this->AssertTake("[true, false, true]", "[null, 1, 0]", options, "[null, false, true]"); + this->AssertTake("[true, false, true]", "[0, 1, 0]", "[true, false, true]"); + this->AssertTake("[null, false, true]", "[0, 1, 0]", "[null, false, null]"); + this->AssertTake("[true, false, true]", "[null, 1, 0]", "[null, false, true]"); std::shared_ptr arr; - ASSERT_RAISES(IndexError, this->Take(boolean(), "[true, false, true]", int8(), - "[0, 9, 0]", options, &arr)); + ASSERT_RAISES(IndexError, + this->Take(boolean(), "[true, false, true]", int8(), "[0, 9, 0]", &arr)); } template class TestTakeKernelWithNumeric : public TestTakeKernel { protected: void AssertTake(const std::string& values, const std::string& indices, - TakeOptions options, const std::string& expected) { - TestTakeKernel::AssertTake(type_singleton(), values, indices, options, - expected); + const std::string& expected) { + TestTakeKernel::AssertTake(type_singleton(), values, indices, expected); } std::shared_ptr type_singleton() { return TypeTraits::type_singleton(); } + void ValidateTake(const std::shared_ptr& values, + const std::shared_ptr& indices_boxed) { + std::shared_ptr taken; + TakeOptions options; + ASSERT_OK( + arrow::compute::Take(&this->ctx_, *values, *indices_boxed, options, &taken)); + ASSERT_EQ(indices_boxed->length(), taken->length()); + + ASSERT_EQ(indices_boxed->type_id(), Type::INT32); + auto indices = checked_pointer_cast(indices_boxed); + for (int64_t i = 0; i < indices->length(); ++i) { + if (indices->IsNull(i)) { + ASSERT_TRUE(taken->IsNull(i)); + continue; + } + int32_t taken_index = indices->Value(i); + ASSERT_TRUE(values->RangeEquals(taken_index, taken_index + 1, i, taken)); + } + } }; TYPED_TEST_CASE(TestTakeKernelWithNumeric, NumericArrowTypes); TYPED_TEST(TestTakeKernelWithNumeric, TakeNumeric) { - TakeOptions options; - this->AssertTake("[7, 8, 9]", "[0, 1, 0]", options, "[7, 8, 7]"); - this->AssertTake("[null, 8, 9]", "[0, 1, 0]", options, "[null, 8, null]"); - this->AssertTake("[7, 8, 9]", "[null, 1, 0]", options, "[null, 8, 7]"); - this->AssertTake("[null, 8, 9]", "[]", options, "[]"); + this->AssertTake("[7, 8, 9]", "[0, 1, 0]", "[7, 8, 7]"); + this->AssertTake("[null, 8, 9]", "[0, 1, 0]", "[null, 8, null]"); + this->AssertTake("[7, 8, 9]", "[null, 1, 0]", "[null, 8, 7]"); + this->AssertTake("[null, 8, 9]", "[]", "[]"); std::shared_ptr arr; ASSERT_RAISES(IndexError, this->Take(this->type_singleton(), "[7, 8, 9]", int8(), - "[0, 9, 0]", options, &arr)); + "[0, 9, 0]", &arr)); +} + +TYPED_TEST(TestTakeKernelWithNumeric, TakeRandomNumeric) { + auto rand = random::RandomArrayGenerator(kSeed); + for (size_t i = 3; i < 8; i++) { + const int64_t length = static_cast(1ULL << i); + for (size_t j = 0; j < 13; j++) { + const int64_t indices_length = static_cast(1ULL << j); + for (auto null_probability : {0.0, 0.01, 0.1, 0.25, 0.5, 1.0}) { + auto values = rand.Numeric(length, 0, 127, null_probability); + auto filter = rand.Int32(indices_length, 0, length - 1, null_probability); + this->ValidateTake(values, filter); + } + } + } } class TestTakeKernelWithString : public TestTakeKernel { protected: void AssertTake(const std::string& values, const std::string& indices, - TakeOptions options, const std::string& expected) { - TestTakeKernel::AssertTake(utf8(), values, indices, options, expected); + const std::string& expected) { + TestTakeKernel::AssertTake(utf8(), values, indices, expected); } void AssertTakeDictionary(const std::string& dictionary_values, const std::string& dictionary_indices, - const std::string& indices, TakeOptions options, + const std::string& indices, const std::string& expected_indices) { auto dict = ArrayFromJSON(utf8(), dictionary_values); auto type = dictionary(int8(), utf8()); @@ -147,28 +179,127 @@ class TestTakeKernelWithString : public TestTakeKernel { ASSERT_OK(DictionaryArray::FromArrays(type, ArrayFromJSON(int8(), expected_indices), dict, &expected)); auto take_indices = ArrayFromJSON(int8(), indices); - this->AssertTakeArrays(values, take_indices, options, expected); + this->AssertTakeArrays(values, take_indices, expected); } }; TEST_F(TestTakeKernelWithString, TakeString) { - TakeOptions options; - this->AssertTake(R"(["a", "b", "c"])", "[0, 1, 0]", options, R"(["a", "b", "a"])"); - this->AssertTake(R"([null, "b", "c"])", "[0, 1, 0]", options, "[null, \"b\", null]"); - this->AssertTake(R"(["a", "b", "c"])", "[null, 1, 0]", options, R"([null, "b", "a"])"); + this->AssertTake(R"(["a", "b", "c"])", "[0, 1, 0]", R"(["a", "b", "a"])"); + this->AssertTake(R"([null, "b", "c"])", "[0, 1, 0]", "[null, \"b\", null]"); + this->AssertTake(R"(["a", "b", "c"])", "[null, 1, 0]", R"([null, "b", "a"])"); std::shared_ptr arr; - ASSERT_RAISES(IndexError, this->Take(utf8(), R"(["a", "b", "c"])", int8(), "[0, 9, 0]", - options, &arr)); + ASSERT_RAISES(IndexError, + this->Take(utf8(), R"(["a", "b", "c"])", int8(), "[0, 9, 0]", &arr)); } TEST_F(TestTakeKernelWithString, TakeDictionary) { - TakeOptions options; auto dict = R"(["a", "b", "c", "d", "e"])"; - this->AssertTakeDictionary(dict, "[3, 4, 2]", "[0, 1, 0]", options, "[3, 4, 3]"); - this->AssertTakeDictionary(dict, "[null, 4, 2]", "[0, 1, 0]", options, - "[null, 4, null]"); - this->AssertTakeDictionary(dict, "[3, 4, 2]", "[null, 1, 0]", options, "[null, 4, 3]"); + this->AssertTakeDictionary(dict, "[3, 4, 2]", "[0, 1, 0]", "[3, 4, 3]"); + this->AssertTakeDictionary(dict, "[null, 4, 2]", "[0, 1, 0]", "[null, 4, null]"); + this->AssertTakeDictionary(dict, "[3, 4, 2]", "[null, 1, 0]", "[null, 4, 3]"); +} + +class TestTakeKernelWithList : public TestTakeKernel {}; + +TEST_F(TestTakeKernelWithList, TakeListInt32) { + std::string list_json = "[[], [1,2], null, [3]]"; + this->AssertTake(list(int32()), list_json, "[]", "[]"); + this->AssertTake(list(int32()), list_json, "[3, 2, 1]", "[[3], null, [1,2]]"); + this->AssertTake(list(int32()), list_json, "[null, 3, 0]", "[null, [3], []]"); + this->AssertTake(list(int32()), list_json, "[null, null]", "[null, null]"); + this->AssertTake(list(int32()), list_json, "[3, 0, 0, 3]", "[[3], [], [], [3]]"); + this->AssertTake(list(int32()), list_json, "[0, 1, 2, 3]", list_json); + this->AssertTake(list(int32()), list_json, "[0, 0, 0, 0, 0, 0, 1]", + "[[], [], [], [], [], [], [1, 2]]"); +} + +class TestTakeKernelWithFixedSizeList : public TestTakeKernel {}; + +TEST_F(TestTakeKernelWithFixedSizeList, TakeFixedSizeListInt32) { + std::string list_json = "[null, [1, null, 3], [4, 5, 6], [7, 8, null]]"; + this->AssertTake(fixed_size_list(int32(), 3), list_json, "[]", "[]"); + this->AssertTake(fixed_size_list(int32(), 3), list_json, "[3, 2, 1]", + "[[7, 8, null], [4, 5, 6], [1, null, 3]]"); + this->AssertTake(fixed_size_list(int32(), 3), list_json, "[null, 2, 0]", + "[null, [4, 5, 6], null]"); + this->AssertTake(fixed_size_list(int32(), 3), list_json, "[null, null]", + "[null, null]"); + this->AssertTake(fixed_size_list(int32(), 3), list_json, "[3, 0, 0, 3]", + "[[7, 8, null], null, null, [7, 8, null]]"); + this->AssertTake(fixed_size_list(int32(), 3), list_json, "[0, 1, 2, 3]", list_json); + this->AssertTake( + fixed_size_list(int32(), 3), list_json, "[2, 2, 2, 2, 2, 2, 1]", + "[[4, 5, 6], [4, 5, 6], [4, 5, 6], [4, 5, 6], [4, 5, 6], [4, 5, 6], [1, null, 3]]"); +} + +class TestTakeKernelWithMap : public TestTakeKernel {}; + +TEST_F(TestTakeKernelWithMap, TakeMapStringToInt32) { + std::string map_json = R"([ + [["joe", 0], ["mark", null]], + null, + [["cap", 8]], + [] + ])"; + this->AssertTake(map(utf8(), int32()), map_json, "[]", "[]"); + this->AssertTake(map(utf8(), int32()), map_json, "[3, 1, 3, 1, 3]", + "[[], null, [], null, []]"); + this->AssertTake(map(utf8(), int32()), map_json, "[2, 1, null]", R"([ + [["cap", 8]], + null, + null + ])"); + this->AssertTake(map(utf8(), int32()), map_json, "[2, 1, 0]", R"([ + [["cap", 8]], + null, + [["joe", 0], ["mark", null]] + ])"); + this->AssertTake(map(utf8(), int32()), map_json, "[0, 1, 2, 3]", map_json); + this->AssertTake(map(utf8(), int32()), map_json, "[0, 0, 0, 0, 0, 0, 3]", R"([ + [["joe", 0], ["mark", null]], + [["joe", 0], ["mark", null]], + [["joe", 0], ["mark", null]], + [["joe", 0], ["mark", null]], + [["joe", 0], ["mark", null]], + [["joe", 0], ["mark", null]], + [] + ])"); +} + +class TestTakeKernelWithStruct : public TestTakeKernel {}; + +TEST_F(TestTakeKernelWithStruct, TakeStruct) { + auto struct_type = struct_({field("a", int32()), field("b", utf8())}); + auto struct_json = R"([ + null, + {"a": 1, "b": ""}, + {"a": 2, "b": "hello"}, + {"a": 4, "b": "eh"} + ])"; + this->AssertTake(struct_type, struct_json, "[]", "[]"); + this->AssertTake(struct_type, struct_json, "[3, 1, 3, 1, 3]", R"([ + {"a": 4, "b": "eh"}, + {"a": 1, "b": ""}, + {"a": 4, "b": "eh"}, + {"a": 1, "b": ""}, + {"a": 4, "b": "eh"} + ])"); + this->AssertTake(struct_type, struct_json, "[3, 1, 0]", R"([ + {"a": 4, "b": "eh"}, + {"a": 1, "b": ""}, + null + ])"); + this->AssertTake(struct_type, struct_json, "[0, 1, 2, 3]", struct_json); + this->AssertTake(struct_type, struct_json, "[0, 2, 2, 2, 2, 2, 2]", R"([ + null, + {"a": 2, "b": "hello"}, + {"a": 2, "b": "hello"}, + {"a": 2, "b": "hello"}, + {"a": 2, "b": "hello"}, + {"a": 2, "b": "hello"}, + {"a": 2, "b": "hello"} + ])"); } } // namespace compute From 6c453f334291e65b564db4bb10c333ab19661816 Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Sat, 22 Jun 2019 13:59:11 -0400 Subject: [PATCH 03/18] lint fixes --- cpp/src/arrow/compute/kernels/filter.cc | 1 + cpp/src/arrow/compute/kernels/take-internal.h | 3 ++- cpp/src/arrow/compute/kernels/take.cc | 6 ++++-- 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/filter.cc b/cpp/src/arrow/compute/kernels/filter.cc index 965f4cbbf35..40a0835901d 100644 --- a/cpp/src/arrow/compute/kernels/filter.cc +++ b/cpp/src/arrow/compute/kernels/filter.cc @@ -17,6 +17,7 @@ #include "arrow/compute/kernels/filter.h" +#include #include #include diff --git a/cpp/src/arrow/compute/kernels/take-internal.h b/cpp/src/arrow/compute/kernels/take-internal.h index aec8ba5bb8b..ba67ab64dbf 100644 --- a/cpp/src/arrow/compute/kernels/take-internal.h +++ b/cpp/src/arrow/compute/kernels/take-internal.h @@ -18,6 +18,7 @@ #pragma once #include +#include #include #include #include @@ -57,7 +58,7 @@ static Status UnsafeAppend(StringBuilder* builder, util::string_view value) { template class Taker { public: - Taker(const std::shared_ptr& type) : type_(type) {} + explicit Taker(const std::shared_ptr& type) : type_(type) {} virtual ~Taker() = default; diff --git a/cpp/src/arrow/compute/kernels/take.cc b/cpp/src/arrow/compute/kernels/take.cc index c85ee6edab8..06aab044339 100644 --- a/cpp/src/arrow/compute/kernels/take.cc +++ b/cpp/src/arrow/compute/kernels/take.cc @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +#include #include #include @@ -36,7 +37,7 @@ class ArrayIndexSequence { static constexpr int64_t take_null_index = std::numeric_limits::min(); static constexpr bool never_out_of_bounds = false; - ArrayIndexSequence(const Array& indices) + explicit ArrayIndexSequence(const Array& indices) : indices_(&checked_cast&>(indices)) {} int64_t Next() { @@ -59,7 +60,8 @@ class ArrayIndexSequence { template class TakeKernelImpl : public TakeKernel { public: - TakeKernelImpl(const std::shared_ptr& value_type) : TakeKernel(value_type) {} + explicit TakeKernelImpl(const std::shared_ptr& value_type) + : TakeKernel(value_type) {} Status Init() { return Taker>::Make(this->type_, &taker_); From 6a14c93e83a6440ca740c56fbadbc898438d91f0 Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Sun, 23 Jun 2019 22:25:57 -0400 Subject: [PATCH 04/18] clang-format, explicit cast --- cpp/src/arrow/compute/kernels/take-internal.h | 1 - cpp/src/arrow/compute/kernels/take-test.cc | 4 +++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/take-internal.h b/cpp/src/arrow/compute/kernels/take-internal.h index ba67ab64dbf..09082ea742d 100644 --- a/cpp/src/arrow/compute/kernels/take-internal.h +++ b/cpp/src/arrow/compute/kernels/take-internal.h @@ -568,4 +568,3 @@ Status Taker::Make(const std::shared_ptr& type, } // namespace compute } // namespace arrow - diff --git a/cpp/src/arrow/compute/kernels/take-test.cc b/cpp/src/arrow/compute/kernels/take-test.cc index 8d5d1fcc665..d8b864fe30b 100644 --- a/cpp/src/arrow/compute/kernels/take-test.cc +++ b/cpp/src/arrow/compute/kernels/take-test.cc @@ -29,6 +29,7 @@ namespace arrow { namespace compute { +using internal::checked_cast; using internal::checked_pointer_cast; using util::string_view; @@ -154,7 +155,8 @@ TYPED_TEST(TestTakeKernelWithNumeric, TakeRandomNumeric) { const int64_t indices_length = static_cast(1ULL << j); for (auto null_probability : {0.0, 0.01, 0.1, 0.25, 0.5, 1.0}) { auto values = rand.Numeric(length, 0, 127, null_probability); - auto filter = rand.Int32(indices_length, 0, length - 1, null_probability); + auto max_index = static_cast(length - 1); + auto filter = rand.Int32(indices_length, 0, max_index, null_probability); this->ValidateTake(values, filter); } } From c7f2e4021b331b26ac1e1aedd150918ee4072dad Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Sun, 23 Jun 2019 22:57:34 -0400 Subject: [PATCH 05/18] repair bounds checking --- cpp/src/arrow/compute/kernels/take-internal.h | 35 ++++++++++--------- cpp/src/arrow/compute/kernels/take-test.cc | 2 ++ 2 files changed, 21 insertions(+), 16 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/take-internal.h b/cpp/src/arrow/compute/kernels/take-internal.h index 09082ea742d..83aa928eddb 100644 --- a/cpp/src/arrow/compute/kernels/take-internal.h +++ b/cpp/src/arrow/compute/kernels/take-internal.h @@ -96,16 +96,26 @@ class Taker { "An index sequence must provide the number of nulls it will take."); protected: + Status OutOfBounds() { return Status::IndexError("take index out of bounds"); } + Status BoundsCheck(const Array& values, int64_t index) { if (IndexSequence::never_out_of_bounds) { return Status::OK(); } + if (index == IndexSequence::take_null_index) { + return Status::OK(); + } if (index < 0 || index >= values.length()) { - return Status::IndexError("take index out of bounds"); + return OutOfBounds(); } return Status::OK(); } + Status BoundsCheckedNext(const Array& values, IndexSequence* indices, int64_t* index) { + *index = indices->Next(); + return BoundsCheck(values, *index); + } + template Status MakeBuilder(MemoryPool* pool, std::unique_ptr* out) { std::unique_ptr builder; @@ -171,12 +181,8 @@ class TakerImpl : public Taker { DCHECK(this->type_->Equals(values.type())); if (!IndexSequence::never_out_of_bounds) { - for (int64_t i = 0; i < indices.length(); ++i) { - int64_t index = indices.Next(); - if (index == IndexSequence::take_null_index) { - continue; - } - RETURN_NOT_OK(this->BoundsCheck(values, index)); + for (int64_t index, i = 0; i < indices.length(); ++i) { + RETURN_NOT_OK(this->BoundsCheckedNext(values, &indices, &index)); } } @@ -235,12 +241,13 @@ class TakerImpl : public Taker { continue; } + RETURN_NOT_OK(this->BoundsCheck(values, index)); + if (SomeValuesNull && values.IsNull(index)) { builder_->UnsafeAppendNull(); continue; } - RETURN_NOT_OK(this->BoundsCheck(values, index)); auto value = checked_cast(values).GetView(index); RETURN_NOT_OK(UnsafeAppend(builder_.get(), value)); } @@ -276,14 +283,13 @@ class TakerImpl : public Taker { int32_t offset = 0; offset_builder_->UnsafeAppend(offset); - for (int64_t i = 0; i < indices.length(); ++i) { - int64_t index = indices.Next(); + for (int64_t index, i = 0; i < indices.length(); ++i) { + RETURN_NOT_OK(this->BoundsCheckedNext(values, &indices, &index)); bool is_valid = index != IndexSequence::take_null_index && values.IsValid(index); null_bitmap_builder_->UnsafeAppend(is_valid); if (is_valid) { - RETURN_NOT_OK(this->BoundsCheck(values, index)); offset += list_array.value_length(index); RangeIndexSequence value_indices(list_array.value_offset(index), list_array.value_length(index)); @@ -352,15 +358,12 @@ class TakerImpl : public Taker RETURN_NOT_OK(null_bitmap_builder_->Reserve(indices.length())); - for (int64_t i = 0; i < indices.length(); ++i) { - int64_t index = indices.Next(); + for (int64_t index, i = 0; i < indices.length(); ++i) { + RETURN_NOT_OK(this->BoundsCheckedNext(values, &indices, &index)); bool is_valid = index != IndexSequence::take_null_index && values.IsValid(index); null_bitmap_builder_->UnsafeAppend(is_valid); - if (is_valid) { - RETURN_NOT_OK(this->BoundsCheck(values, index)); - } RangeOrNullIndexSequence value_indices( is_valid ? list_array.value_offset(index) : IndexSequence::take_null_index, list_size); diff --git a/cpp/src/arrow/compute/kernels/take-test.cc b/cpp/src/arrow/compute/kernels/take-test.cc index d8b864fe30b..92971882b84 100644 --- a/cpp/src/arrow/compute/kernels/take-test.cc +++ b/cpp/src/arrow/compute/kernels/take-test.cc @@ -193,6 +193,8 @@ TEST_F(TestTakeKernelWithString, TakeString) { std::shared_ptr arr; ASSERT_RAISES(IndexError, this->Take(utf8(), R"(["a", "b", "c"])", int8(), "[0, 9, 0]", &arr)); + ASSERT_RAISES(IndexError, this->Take(utf8(), R"(["a", "b", null, "ddd", "ee"])", + int64(), "[2, 5]", &arr)); } TEST_F(TestTakeKernelWithString, TakeDictionary) { From c3e812982dfe27d2df80ee961b026611c55e1f04 Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Mon, 24 Jun 2019 07:16:56 -0400 Subject: [PATCH 06/18] rewrite python Take() test --- python/pyarrow/tests/test_compute.py | 16 ++++------------ 1 file changed, 4 insertions(+), 12 deletions(-) diff --git a/python/pyarrow/tests/test_compute.py b/python/pyarrow/tests/test_compute.py index 655dd38d1f0..e356b9d9c9a 100644 --- a/python/pyarrow/tests/test_compute.py +++ b/python/pyarrow/tests/test_compute.py @@ -51,7 +51,10 @@ def test_sum(arrow_type): ('double', np.arange(0, 0.5, 0.1)), ('string', ['a', 'b', None, 'ddd', 'ee']), ('binary', [b'a', b'b', b'c', b'ddd', b'ee']), - (pa.binary(3), [b'abc', b'bcd', b'cde', b'def', b'efg']) + (pa.binary(3), [b'abc', b'bcd', b'cde', b'def', b'efg']), + (pa.list_(pa.int8()), [[1, 2], [3, 4], [5, 6], None, [9, 16]]), + (pa.struct([('a', pa.int8()), ('b', pa.int8())]), [ + {'a': 1, 'b': 2}, None, {'a': 3, 'b': 4}, None, {'a': 5, 'b': 6}]), ]) def test_take(ty, values): arr = pa.array(values, type=ty) @@ -100,14 +103,3 @@ def test_take_dictionary(ordered): assert result.to_pylist() == ['a', 'b', 'a'] assert result.dictionary.to_pylist() == ['a', 'b', 'c'] assert result.type.ordered is ordered - - -@pytest.mark.parametrize('array', [ - [[1, 2], [3, 4], [5, 6]], - [{'a': 1, 'b': 2}, None, {'a': 3, 'b': 4}], -], ids=['listarray', 'structarray']) -def test_take_notimplemented(array): - array = pa.array(array) - indices = pa.array([0, 2]) - with pytest.raises(NotImplementedError): - array.take(indices) From abc1733bdeb32ee43cdae31297be0830fedbeb3f Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Tue, 25 Jun 2019 07:37:15 -0400 Subject: [PATCH 07/18] simplify looping through IndexSequences --- cpp/src/arrow/compute/kernels/filter.cc | 12 +- cpp/src/arrow/compute/kernels/take-internal.h | 206 ++++++++---------- cpp/src/arrow/compute/kernels/take.cc | 9 +- 3 files changed, 103 insertions(+), 124 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/filter.cc b/cpp/src/arrow/compute/kernels/filter.cc index 40a0835901d..7cc027c0ad1 100644 --- a/cpp/src/arrow/compute/kernels/filter.cc +++ b/cpp/src/arrow/compute/kernels/filter.cc @@ -35,21 +35,17 @@ using internal::checked_pointer_cast; class FilterIndexSequence { public: - static constexpr int64_t take_null_index = std::numeric_limits::min(); - static constexpr bool never_out_of_bounds = true; + constexpr bool never_out_of_bounds() const { return true; } FilterIndexSequence(const BooleanArray& filter, int64_t out_length) : filter_(&filter), out_length_(out_length) {} - int64_t Next() { + std::pair Next() { while (filter_->IsValid(index_) && !filter_->Value(index_)) { ++index_; } - if (filter_->IsNull(index_)) { - ++index_; - return take_null_index; - } - return index_++; + bool is_valid = filter_->IsValid(index_); + return std::make_pair(index_++, is_valid); } int64_t length() const { return out_length_; } diff --git a/cpp/src/arrow/compute/kernels/take-internal.h b/cpp/src/arrow/compute/kernels/take-internal.h index 83aa928eddb..cd897d55c18 100644 --- a/cpp/src/arrow/compute/kernels/take-internal.h +++ b/cpp/src/arrow/compute/kernels/take-internal.h @@ -55,6 +55,57 @@ static Status UnsafeAppend(StringBuilder* builder, util::string_view value) { return Status::OK(); } +template +Status VisitIndices(IndexSequence indices, const Array& values, Visitor&& vis) { + for (int64_t i = 0; i < indices.length(); ++i) { + auto index_valid = indices.Next(); + if (SomeIndicesNull && !index_valid.second) { + RETURN_NOT_OK(vis(0, false)); + continue; + } + + auto index = index_valid.first; + if (!NeverOutOfBounds) { + if (index < 0 || index >= values.length()) { + return Status::IndexError("take index out of bounds"); + } + } + + bool is_valid = !SomeValuesNull || values.IsValid(index); + RETURN_NOT_OK(vis(index, is_valid)); + } + return Status::OK(); +} + +template +Status VisitIndices(IndexSequence indices, const Array& values, Visitor&& vis) { + if (indices.never_out_of_bounds()) { + return VisitIndices( + indices, values, std::forward(vis)); + } + return VisitIndices(indices, values, + std::forward(vis)); +} + +template +Status VisitIndices(IndexSequence indices, const Array& values, Visitor&& vis) { + if (values.null_count() == 0) { + return VisitIndices(indices, values, + std::forward(vis)); + } + return VisitIndices(indices, values, std::forward(vis)); +} + +template +Status VisitIndices(IndexSequence indices, const Array& values, Visitor&& vis) { + if (indices.null_count() == 0) { + return VisitIndices(indices, values, std::forward(vis)); + } + return VisitIndices(indices, values, std::forward(vis)); +} + template class Taker { public: @@ -75,17 +126,9 @@ class Taker { static_assert(std::is_copy_constructible::value, "Index sequences must be copy constructible"); - static_assert( - IndexSequence::take_null_index == std::numeric_limits::min(), - "Index sequences must declare a taken element as null with index == LONG_MIN"); - - static_assert( - std::is_same::value, - "Index sequences must declare whether bounds checking is necessary"); - - static_assert( - std::is_same().Next()), int64_t>::value, - "An index sequence must yield indices of type int64_t."); + static_assert(std::is_same().Next()), + std::pair>::value, + "An index sequence must yield pairs of indices:int64_t, validity:bool."); static_assert(std::is_same().length()), int64_t>::value, @@ -95,27 +138,12 @@ class Taker { int64_t>::value, "An index sequence must provide the number of nulls it will take."); - protected: - Status OutOfBounds() { return Status::IndexError("take index out of bounds"); } - - Status BoundsCheck(const Array& values, int64_t index) { - if (IndexSequence::never_out_of_bounds) { - return Status::OK(); - } - if (index == IndexSequence::take_null_index) { - return Status::OK(); - } - if (index < 0 || index >= values.length()) { - return OutOfBounds(); - } - return Status::OK(); - } - - Status BoundsCheckedNext(const Array& values, IndexSequence* indices, int64_t* index) { - *index = indices->Next(); - return BoundsCheck(values, *index); - } + static_assert( + std::is_same().never_out_of_bounds()), + bool>::value, + "Index sequences must declare whether bounds checking is necessary"); + protected: template Status MakeBuilder(MemoryPool* pool, std::unique_ptr* out) { std::unique_ptr builder; @@ -129,12 +157,11 @@ class Taker { class RangeIndexSequence { public: - static constexpr int64_t take_null_index = std::numeric_limits::min(); - static constexpr bool never_out_of_bounds = true; + constexpr bool never_out_of_bounds() const { return true; } RangeIndexSequence(int64_t offset, int64_t length) : index_(offset), length_(length) {} - int64_t Next() { return index_++; } + std::pair Next() { return std::make_pair(index_++, true); } int64_t length() const { return length_; } @@ -146,24 +173,19 @@ class RangeIndexSequence { class RangeOrNullIndexSequence { public: - static constexpr int64_t take_null_index = std::numeric_limits::min(); - static constexpr bool never_out_of_bounds = true; + constexpr bool never_out_of_bounds() const { return true; } - RangeOrNullIndexSequence(int64_t offset_or_null, int64_t length) - : index_(offset_or_null), length_(length) {} + RangeOrNullIndexSequence(bool is_valid, int64_t offset, int64_t length) + : is_valid_(is_valid), index_(offset), length_(length) {} - int64_t Next() { - if (index_ == take_null_index) { - return take_null_index; - } - return index_++; - } + std::pair Next() { return std::make_pair(index_++, is_valid_); } int64_t length() const { return length_; } - int64_t null_count() const { return index_ == take_null_index ? length_ : 0; } + int64_t null_count() const { return is_valid_ ? 0 : length_; } private: + bool is_valid_ = true; int64_t index_ = 0, length_ = -1; }; @@ -180,14 +202,16 @@ class TakerImpl : public Taker { Status Take(const Array& values, IndexSequence indices) override { DCHECK(this->type_->Equals(values.type())); - if (!IndexSequence::never_out_of_bounds) { - for (int64_t index, i = 0; i < indices.length(); ++i) { - RETURN_NOT_OK(this->BoundsCheckedNext(values, &indices, &index)); - } + length_ += indices.length(); + + if (indices.never_out_of_bounds()) { + return Status::OK(); } - length_ += indices.length(); - return Status::OK(); + return VisitIndices(indices, values, [](int64_t, bool) { + // just do bounds checking + return Status::OK(); + }); } Status Finish(std::shared_ptr* out) override { @@ -212,48 +236,19 @@ class TakerImpl : public Taker { Status Take(const Array& values, IndexSequence indices) override { DCHECK(this->type_->Equals(values.type())); RETURN_NOT_OK(builder_->Reserve(indices.length())); - - if (indices.null_count() == 0) { - if (values.null_count() == 0) { - return Take(values, indices); - } else { - return Take(values, indices); - } - } else { - if (values.null_count() == 0) { - return Take(values, indices); - } else { - return Take(values, indices); + return VisitIndices(indices, values, [&](int64_t index, bool is_valid) { + if (!is_valid) { + builder_->UnsafeAppendNull(); + return Status::OK(); } - } + auto value = checked_cast(values).GetView(index); + return UnsafeAppend(builder_.get(), value); + }); } Status Finish(std::shared_ptr* out) override { return builder_->Finish(out); } private: - template - Status Take(const Array& values, IndexSequence indices) { - for (int64_t i = 0; i < indices.length(); ++i) { - int64_t index = indices.Next(); - - if (SomeIndicesNull && index == IndexSequence::take_null_index) { - builder_->UnsafeAppendNull(); - continue; - } - - RETURN_NOT_OK(this->BoundsCheck(values, index)); - - if (SomeValuesNull && values.IsNull(index)) { - builder_->UnsafeAppendNull(); - continue; - } - - auto value = checked_cast(values).GetView(index); - RETURN_NOT_OK(UnsafeAppend(builder_.get(), value)); - } - return Status::OK(); - } - std::unique_ptr builder_; }; @@ -282,11 +277,7 @@ class TakerImpl : public Taker { RETURN_NOT_OK(offset_builder_->Reserve(indices.length() + 1)); int32_t offset = 0; offset_builder_->UnsafeAppend(offset); - - for (int64_t index, i = 0; i < indices.length(); ++i) { - RETURN_NOT_OK(this->BoundsCheckedNext(values, &indices, &index)); - - bool is_valid = index != IndexSequence::take_null_index && values.IsValid(index); + return VisitIndices(indices, values, [&](int64_t index, bool is_valid) { null_bitmap_builder_->UnsafeAppend(is_valid); if (is_valid) { @@ -295,9 +286,10 @@ class TakerImpl : public Taker { list_array.value_length(index)); RETURN_NOT_OK(value_taker_->Take(*list_array.values(), value_indices)); } + offset_builder_->UnsafeAppend(offset); - } - return Status::OK(); + return Status::OK(); + }); } Status Finish(std::shared_ptr* out) override { return FinishAs(out); } @@ -357,19 +349,13 @@ class TakerImpl : public Taker auto list_size = list_array.list_type()->list_size(); RETURN_NOT_OK(null_bitmap_builder_->Reserve(indices.length())); - - for (int64_t index, i = 0; i < indices.length(); ++i) { - RETURN_NOT_OK(this->BoundsCheckedNext(values, &indices, &index)); - - bool is_valid = index != IndexSequence::take_null_index && values.IsValid(index); + return VisitIndices(indices, values, [&](int64_t index, bool is_valid) { null_bitmap_builder_->UnsafeAppend(is_valid); - RangeOrNullIndexSequence value_indices( - is_valid ? list_array.value_offset(index) : IndexSequence::take_null_index, - list_size); - RETURN_NOT_OK(value_taker_->Take(*list_array.values(), value_indices)); - } - return Status::OK(); + RangeOrNullIndexSequence value_indices(is_valid, list_array.value_offset(index), + list_size); + return value_taker_->Take(*list_array.values(), value_indices); + }); } Status Finish(std::shared_ptr* out) override { @@ -424,12 +410,10 @@ class TakerImpl : public Taker { // TODO(bkietz) each child is doing bounds checking; this only needs to happen once RETURN_NOT_OK(null_bitmap_builder_->Reserve(indices.length())); - for (int64_t i = 0; i < indices.length(); ++i) { - int64_t index = indices.Next(); - null_bitmap_builder_->UnsafeAppend(index != IndexSequence::take_null_index && - values.IsValid(index)); - } - return Status::OK(); + return VisitIndices(indices, values, [&](int64_t, bool is_valid) { + null_bitmap_builder_->UnsafeAppend(is_valid); + return Status::OK(); + }); } Status Finish(std::shared_ptr* out) override { diff --git a/cpp/src/arrow/compute/kernels/take.cc b/cpp/src/arrow/compute/kernels/take.cc index 06aab044339..0c26a6158f8 100644 --- a/cpp/src/arrow/compute/kernels/take.cc +++ b/cpp/src/arrow/compute/kernels/take.cc @@ -34,18 +34,17 @@ using internal::checked_cast; template class ArrayIndexSequence { public: - static constexpr int64_t take_null_index = std::numeric_limits::min(); - static constexpr bool never_out_of_bounds = false; + constexpr bool never_out_of_bounds() const { return false; } explicit ArrayIndexSequence(const Array& indices) : indices_(&checked_cast&>(indices)) {} - int64_t Next() { + std::pair Next() { if (indices_->IsNull(index_)) { ++index_; - return take_null_index; + return std::make_pair(-1, false); } - return static_cast(indices_->Value(index_++)); + return std::make_pair(indices_->Value(index_++), true); } int64_t length() const { return indices_->length(); } From d9c4a1a64a530e30da1164b851e2f646cd8edeb2 Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Tue, 25 Jun 2019 08:10:49 -0400 Subject: [PATCH 08/18] add Take() permutation inversion test --- cpp/src/arrow/compute/kernels/take-test.cc | 114 +++++++++++++++++++++ 1 file changed, 114 insertions(+) diff --git a/cpp/src/arrow/compute/kernels/take-test.cc b/cpp/src/arrow/compute/kernels/take-test.cc index 92971882b84..e4fd09dc1a3 100644 --- a/cpp/src/arrow/compute/kernels/take-test.cc +++ b/cpp/src/arrow/compute/kernels/take-test.cc @@ -306,5 +306,119 @@ TEST_F(TestTakeKernelWithStruct, TakeStruct) { ])"); } +class TestPermutationsWithTake : public ComputeFixture, public TestBase { + protected: + void Take(const Int16Array& values, const Int16Array& indices, + std::shared_ptr* out) { + TakeOptions options; + std::shared_ptr boxed_out; + ASSERT_OK(arrow::compute::Take(&this->ctx_, values, indices, options, &boxed_out)); + *out = checked_pointer_cast(std::move(boxed_out)); + } + + std::shared_ptr Take(const Int16Array& values, const Int16Array& indices) { + std::shared_ptr out; + Take(values, indices, &out); + return out; + } + + std::shared_ptr TakeN(uint64_t n, std::shared_ptr array) { + auto power_of_2 = array; + array = Identity(array->length()); + while (n != 0) { + if (n & 1) { + array = Take(*array, *power_of_2); + } + power_of_2 = Take(*power_of_2, *power_of_2); + n >>= 1; + } + return array; + } + + template + void Shuffle(const Int16Array& array, Rng& gen, std::shared_ptr* shuffled) { + auto byte_length = array.length() * sizeof(int16_t); + std::shared_ptr data; + ASSERT_OK(array.values()->Copy(0, byte_length, &data)); + auto mutable_data = reinterpret_cast(data->mutable_data()); + std::shuffle(mutable_data, mutable_data + array.length(), gen); + shuffled->reset(new Int16Array(array.length(), data)); + } + + template + std::shared_ptr Shuffle(const Int16Array& array, Rng& gen) { + std::shared_ptr out; + Shuffle(array, gen, &out); + return out; + } + + void Identity(int64_t length, std::shared_ptr* identity) { + Int16Builder identity_builder; + ASSERT_OK(identity_builder.Resize(length)); + for (int16_t i = 0; i < length; ++i) { + identity_builder.UnsafeAppend(i); + } + ASSERT_OK(identity_builder.Finish(identity)); + } + + std::shared_ptr Identity(int64_t length) { + std::shared_ptr out; + Identity(length, &out); + return out; + } + + std::shared_ptr Inverse(const std::shared_ptr& permutation) { + std::vector cycle_lengths(permutation->length() + 1, false); + auto permutation_to_the_i = permutation; + for (int16_t cycle_length = 1; cycle_length <= permutation->length(); + ++cycle_length) { + cycle_lengths[cycle_length] = HasTrivialCycle(*permutation_to_the_i); + permutation_to_the_i = Take(*permutation, *permutation_to_the_i); + } + + uint64_t cycle_to_identity_length = 1; + for (int16_t cycle_length = permutation->length(); cycle_length > 1; --cycle_length) { + if (!cycle_lengths[cycle_length]) { + continue; + } + if (cycle_to_identity_length % cycle_length == 0) { + continue; + } + if (cycle_to_identity_length > + std::numeric_limits::max() / cycle_length) { + // overflow, can't compute Inverse + return nullptr; + } + cycle_to_identity_length *= cycle_length; + } + + return TakeN(cycle_to_identity_length - 1, permutation); + } + + bool HasTrivialCycle(const Int16Array& permutation) { + for (int64_t i = 0; i < permutation.length(); ++i) { + if (permutation.Value(i) == static_cast(i)) { + return true; + } + } + return false; + } +}; + +TEST_F(TestPermutationsWithTake, InvertPermutation) { + for (int seed : {0, kSeed, kSeed * 2 - 1}) { + std::default_random_engine gen(seed); + for (int16_t length = 0; length < 1 << 10; ++length) { + auto identity = Identity(length); + auto permutation = Shuffle(*identity, gen); + auto inverse = Inverse(permutation); + if (inverse == nullptr) { + break; + } + ASSERT_TRUE(Take(*inverse, *permutation)->Equals(identity)); + } + } +} + } // namespace compute } // namespace arrow From e6081b027163f3446b1587027aa1c4b91ad7c2c1 Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Tue, 25 Jun 2019 08:21:10 -0400 Subject: [PATCH 09/18] remove redundant bounds checking in Struct case --- cpp/src/arrow/compute/kernels/filter.cc | 1 + cpp/src/arrow/compute/kernels/take-internal.h | 24 +++++++++++++------ cpp/src/arrow/compute/kernels/take.cc | 4 +++- 3 files changed, 21 insertions(+), 8 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/filter.cc b/cpp/src/arrow/compute/kernels/filter.cc index 7cc027c0ad1..aec79b4c500 100644 --- a/cpp/src/arrow/compute/kernels/filter.cc +++ b/cpp/src/arrow/compute/kernels/filter.cc @@ -36,6 +36,7 @@ using internal::checked_pointer_cast; class FilterIndexSequence { public: constexpr bool never_out_of_bounds() const { return true; } + void set_never_out_of_bounds() {} FilterIndexSequence(const BooleanArray& filter, int64_t out_length) : filter_(&filter), out_length_(out_length) {} diff --git a/cpp/src/arrow/compute/kernels/take-internal.h b/cpp/src/arrow/compute/kernels/take-internal.h index cd897d55c18..a6188370438 100644 --- a/cpp/src/arrow/compute/kernels/take-internal.h +++ b/cpp/src/arrow/compute/kernels/take-internal.h @@ -143,6 +143,11 @@ class Taker { bool>::value, "Index sequences must declare whether bounds checking is necessary"); + static_assert( + std::is_same().set_never_out_of_bounds()), + void>::value, + "An index sequence must support ignoring bounds checking."); + protected: template Status MakeBuilder(MemoryPool* pool, std::unique_ptr* out) { @@ -158,6 +163,7 @@ class Taker { class RangeIndexSequence { public: constexpr bool never_out_of_bounds() const { return true; } + void set_never_out_of_bounds() {} RangeIndexSequence(int64_t offset, int64_t length) : index_(offset), length_(length) {} @@ -174,6 +180,7 @@ class RangeIndexSequence { class RangeOrNullIndexSequence { public: constexpr bool never_out_of_bounds() const { return true; } + void set_never_out_of_bounds() {} RangeOrNullIndexSequence(bool is_valid, int64_t offset, int64_t length) : is_valid_(is_valid), index_(offset), length_(length) {} @@ -403,17 +410,20 @@ class TakerImpl : public Taker { Status Take(const Array& values, IndexSequence indices) override { DCHECK(this->type_->Equals(values.type())); + RETURN_NOT_OK(null_bitmap_builder_->Reserve(indices.length())); + RETURN_NOT_OK(VisitIndices(indices, values, [&](int64_t, bool is_valid) { + null_bitmap_builder_->UnsafeAppend(is_valid); + return Status::OK(); + })); + + // bounds checking was done while appending to the null bitmap + indices.set_never_out_of_bounds(); + const auto& struct_array = checked_cast(values); for (int i = 0; i < this->type_->num_children(); ++i) { RETURN_NOT_OK(children_[i]->Take(*struct_array.field(i), indices)); } - // TODO(bkietz) each child is doing bounds checking; this only needs to happen once - - RETURN_NOT_OK(null_bitmap_builder_->Reserve(indices.length())); - return VisitIndices(indices, values, [&](int64_t, bool is_valid) { - null_bitmap_builder_->UnsafeAppend(is_valid); - return Status::OK(); - }); + return Status::OK(); } Status Finish(std::shared_ptr* out) override { diff --git a/cpp/src/arrow/compute/kernels/take.cc b/cpp/src/arrow/compute/kernels/take.cc index 0c26a6158f8..54e452f9617 100644 --- a/cpp/src/arrow/compute/kernels/take.cc +++ b/cpp/src/arrow/compute/kernels/take.cc @@ -34,7 +34,8 @@ using internal::checked_cast; template class ArrayIndexSequence { public: - constexpr bool never_out_of_bounds() const { return false; } + bool never_out_of_bounds() const { return never_out_of_bounds_; } + void set_never_out_of_bounds() { never_out_of_bounds_ = true; } explicit ArrayIndexSequence(const Array& indices) : indices_(&checked_cast&>(indices)) {} @@ -54,6 +55,7 @@ class ArrayIndexSequence { private: const NumericArray* indices_; int64_t index_ = 0; + bool never_out_of_bounds_ = false; }; template From 55854836dd29055b9dd1bfd1b7eae35cf22a27cb Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Tue, 25 Jun 2019 09:10:31 -0400 Subject: [PATCH 10/18] add doccomments --- cpp/src/arrow/compute/kernels/filter.cc | 4 + cpp/src/arrow/compute/kernels/take-internal.h | 148 ++++++++---------- cpp/src/arrow/compute/kernels/take.cc | 1 + cpp/src/arrow/compute/kernels/take.h | 12 ++ 4 files changed, 82 insertions(+), 83 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/filter.cc b/cpp/src/arrow/compute/kernels/filter.cc index aec79b4c500..60cd393e5dd 100644 --- a/cpp/src/arrow/compute/kernels/filter.cc +++ b/cpp/src/arrow/compute/kernels/filter.cc @@ -33,8 +33,11 @@ namespace compute { using internal::checked_cast; using internal::checked_pointer_cast; +// IndexSequence which yields the indices of positions in a BooleanArray +// which are either null or true class FilterIndexSequence { public: + // constexpr so we'll never instantiate bounds checking constexpr bool never_out_of_bounds() const { return true; } void set_never_out_of_bounds() {} @@ -42,6 +45,7 @@ class FilterIndexSequence { : filter_(&filter), out_length_(out_length) {} std::pair Next() { + // skip until an index is found at which the filter is either null or true while (filter_->IsValid(index_) && !filter_->Value(index_)) { ++index_; } diff --git a/cpp/src/arrow/compute/kernels/take-internal.h b/cpp/src/arrow/compute/kernels/take-internal.h index a6188370438..835803e99a7 100644 --- a/cpp/src/arrow/compute/kernels/take-internal.h +++ b/cpp/src/arrow/compute/kernels/take-internal.h @@ -43,18 +43,25 @@ static Status UnsafeAppend(Builder* builder, Scalar&& value) { return Status::OK(); } +// Use BinaryBuilder::UnsafeAppend, but reserve byte storage first static Status UnsafeAppend(BinaryBuilder* builder, util::string_view value) { RETURN_NOT_OK(builder->ReserveData(static_cast(value.size()))); builder->UnsafeAppend(value); return Status::OK(); } +// Use StringBuilder::UnsafeAppend, but reserve character storage first static Status UnsafeAppend(StringBuilder* builder, util::string_view value) { RETURN_NOT_OK(builder->ReserveData(static_cast(value.size()))); builder->UnsafeAppend(value); return Status::OK(); } +/// \brief visit indices from an IndexSequence while bounds checking +/// +/// \param[in] indices IndexSequence to visit +/// \param[in] values array to bounds check against, if necessary +/// \param[in] vis index visitor, signature must be Status(int64_t index, bool is_valid) template Status VisitIndices(IndexSequence indices, const Array& values, Visitor&& vis) { @@ -106,6 +113,7 @@ Status VisitIndices(IndexSequence indices, const Array& values, Visitor&& vis) { return VisitIndices(indices, values, std::forward(vis)); } +// Helper class for gathering values from an array template class Taker { public: @@ -113,14 +121,20 @@ class Taker { virtual ~Taker() = default; + // construct any children, must be called once after construction virtual Status MakeChildren() { return Status::OK(); } + // reset this Taker, prepare to gather into an array allocated from pool + // must be called each time the output pool may have changed virtual Status Init(MemoryPool* pool) = 0; + // gather elements from an array at the provided indices virtual Status Take(const Array& values, IndexSequence indices) = 0; + // assemble an array of all gathered values virtual Status Finish(std::shared_ptr*) = 0; + // factory; the output Taker will support gathering values of the given type static Status Make(const std::shared_ptr& type, std::unique_ptr* out); static_assert(std::is_copy_constructible::value, @@ -160,45 +174,61 @@ class Taker { std::shared_ptr type_; }; +// an IndexSequence which yields indices from a specified range +// or yields null for the length of that range class RangeIndexSequence { public: constexpr bool never_out_of_bounds() const { return true; } void set_never_out_of_bounds() {} - RangeIndexSequence(int64_t offset, int64_t length) : index_(offset), length_(length) {} + RangeIndexSequence(bool is_valid, int64_t offset, int64_t length) + : is_valid_(is_valid), index_(offset), length_(length) {} - std::pair Next() { return std::make_pair(index_++, true); } + std::pair Next() { return std::make_pair(index_++, is_valid_); } int64_t length() const { return length_; } - int64_t null_count() const { return 0; } + int64_t null_count() const { return is_valid_ ? 0 : length_; } private: + bool is_valid_ = true; int64_t index_ = 0, length_ = -1; }; -class RangeOrNullIndexSequence { +// Default implementation: taking from a simple array into a builder requires only that +// the array supports array.GetView() and the corresponding builder supports +// builder.UnsafeAppend(array.GetView()) +template +class TakerImpl : public Taker { public: - constexpr bool never_out_of_bounds() const { return true; } - void set_never_out_of_bounds() {} + using ArrayType = typename TypeTraits::ArrayType; + using BuilderType = typename TypeTraits::BuilderType; - RangeOrNullIndexSequence(bool is_valid, int64_t offset, int64_t length) - : is_valid_(is_valid), index_(offset), length_(length) {} + using Taker::Taker; - std::pair Next() { return std::make_pair(index_++, is_valid_); } + Status Init(MemoryPool* pool) override { return this->MakeBuilder(pool, &builder_); } - int64_t length() const { return length_; } + Status Take(const Array& values, IndexSequence indices) override { + DCHECK(this->type_->Equals(values.type())); + RETURN_NOT_OK(builder_->Reserve(indices.length())); + return VisitIndices(indices, values, [&](int64_t index, bool is_valid) { + if (!is_valid) { + builder_->UnsafeAppendNull(); + return Status::OK(); + } + auto value = checked_cast(values).GetView(index); + return UnsafeAppend(builder_.get(), value); + }); + } - int64_t null_count() const { return is_valid_ ? 0 : length_; } + Status Finish(std::shared_ptr* out) override { return builder_->Finish(out); } private: - bool is_valid_ = true; - int64_t index_ = 0, length_ = -1; + std::unique_ptr builder_; }; -template -class TakerImpl; - +// Gathering from NullArrays is trivial; skip the builder and just +// do bounds checking template class TakerImpl : public Taker { public: @@ -215,10 +245,7 @@ class TakerImpl : public Taker { return Status::OK(); } - return VisitIndices(indices, values, [](int64_t, bool) { - // just do bounds checking - return Status::OK(); - }); + return VisitIndices(indices, values, [](int64_t, bool) { return Status::OK(); }); } Status Finish(std::shared_ptr* out) override { @@ -230,35 +257,6 @@ class TakerImpl : public Taker { int64_t length_ = 0; }; -template -class TakerImpl : public Taker { - public: - using ArrayType = typename TypeTraits::ArrayType; - using BuilderType = typename TypeTraits::BuilderType; - - using Taker::Taker; - - Status Init(MemoryPool* pool) override { return this->MakeBuilder(pool, &builder_); } - - Status Take(const Array& values, IndexSequence indices) override { - DCHECK(this->type_->Equals(values.type())); - RETURN_NOT_OK(builder_->Reserve(indices.length())); - return VisitIndices(indices, values, [&](int64_t index, bool is_valid) { - if (!is_valid) { - builder_->UnsafeAppendNull(); - return Status::OK(); - } - auto value = checked_cast(values).GetView(index); - return UnsafeAppend(builder_.get(), value); - }); - } - - Status Finish(std::shared_ptr* out) override { return builder_->Finish(out); } - - private: - std::unique_ptr builder_; -}; - template class TakerImpl : public Taker { public: @@ -282,14 +280,16 @@ class TakerImpl : public Taker { RETURN_NOT_OK(null_bitmap_builder_->Reserve(indices.length())); RETURN_NOT_OK(offset_builder_->Reserve(indices.length() + 1)); + int32_t offset = 0; offset_builder_->UnsafeAppend(offset); + return VisitIndices(indices, values, [&](int64_t index, bool is_valid) { null_bitmap_builder_->UnsafeAppend(is_valid); if (is_valid) { offset += list_array.value_length(index); - RangeIndexSequence value_indices(list_array.value_offset(index), + RangeIndexSequence value_indices(true, list_array.value_offset(index), list_array.value_length(index)); RETURN_NOT_OK(value_taker_->Take(*list_array.values(), value_indices)); } @@ -302,6 +302,8 @@ class TakerImpl : public Taker { Status Finish(std::shared_ptr* out) override { return FinishAs(out); } protected: + // this added method is provided for use by TakerImpl, + // which needs to construct a MapArray rather than a ListArray template Status FinishAs(std::shared_ptr* out) { auto null_count = null_bitmap_builder_->false_count(); @@ -341,7 +343,7 @@ class TakerImpl : public Taker Status MakeChildren() override { const auto& list_type = checked_cast(*this->type_); - return Taker::Make(list_type.value_type(), &value_taker_); + return Taker::Make(list_type.value_type(), &value_taker_); } Status Init(MemoryPool* pool) override { @@ -359,8 +361,10 @@ class TakerImpl : public Taker return VisitIndices(indices, values, [&](int64_t index, bool is_valid) { null_bitmap_builder_->UnsafeAppend(is_valid); - RangeOrNullIndexSequence value_indices(is_valid, list_array.value_offset(index), - list_size); + // for FixedSizeList, null lists are not empty (they also span a segment of + // list_size in the child data), so we must append to value_taker_ even if !is_valid + RangeIndexSequence value_indices(is_valid, list_array.value_offset(index), + list_size); return value_taker_->Take(*list_array.values(), value_indices); }); } @@ -382,7 +386,7 @@ class TakerImpl : public Taker protected: std::unique_ptr> null_bitmap_builder_; - std::unique_ptr> value_taker_; + std::unique_ptr> value_taker_; }; template @@ -447,6 +451,7 @@ class TakerImpl : public Taker { std::vector>> children_; }; +// taking from a DictionaryArray is accomplished by taking from its indices template class TakerImpl : public Taker { public: @@ -487,6 +492,7 @@ class TakerImpl : public Taker { std::unique_ptr> index_taker_; }; +// taking from an ExtensionArray is accomplished by taking from its storage template class TakerImpl : public Taker { public: @@ -518,40 +524,16 @@ class TakerImpl : public Taker { template struct TakerMakeImpl { - Status Visit(const NullType&) { return Make(); } - - template - typename std::enable_if::value, Status>::type - Visit(const Fixed&) { - return Make(); - } - - Status Visit(const BinaryType&) { return Make(); } - - Status Visit(const StringType&) { return Make(); } - - Status Visit(const ListType&) { return Make(); } - - Status Visit(const MapType&) { return Make(); } - - Status Visit(const FixedSizeListType&) { return Make(); } - - Status Visit(const StructType& t) { return Make(); } - - Status Visit(const DictionaryType& t) { return Make(); } - - Status Visit(const ExtensionType& t) { return Make(); } - - Status Visit(const DataType& t) { - return Status::NotImplemented("gathering values of type ", t); - } - template - Status Make() { + Status Visit(const T&) { out_->reset(new TakerImpl(type_)); return (*out_)->MakeChildren(); } + Status Visit(const UnionType& t) { + return Status::NotImplemented("gathering values of type ", t); + } + std::shared_ptr type_; std::unique_ptr>* out_; }; diff --git a/cpp/src/arrow/compute/kernels/take.cc b/cpp/src/arrow/compute/kernels/take.cc index 54e452f9617..ede8e5f9e7d 100644 --- a/cpp/src/arrow/compute/kernels/take.cc +++ b/cpp/src/arrow/compute/kernels/take.cc @@ -31,6 +31,7 @@ namespace compute { using internal::checked_cast; +// an IndexSequence which yields the values of an Array of integers template class ArrayIndexSequence { public: diff --git a/cpp/src/arrow/compute/kernels/take.h b/cpp/src/arrow/compute/kernels/take.h index d0d08513f19..f064b7265ed 100644 --- a/cpp/src/arrow/compute/kernels/take.h +++ b/cpp/src/arrow/compute/kernels/take.h @@ -70,15 +70,27 @@ class ARROW_EXPORT TakeKernel : public BinaryKernel { explicit TakeKernel(const std::shared_ptr& type, TakeOptions options = {}) : type_(type) {} + /// \brief BinaryKernel interface + /// + /// delegates to subclasses via Take() Status Call(FunctionContext* ctx, const Datum& values, const Datum& indices, Datum* out) override; + /// \brief output type of this kernel (identical to type of values taken) std::shared_ptr out_type() const override { return type_; } + /// \brief factory for TakeKernels + /// + /// \param[in] value_type constructed TakeKernel will support taking + /// values of this type + /// \param[in] index_type constructed TakeKernel will support taking + /// with indices of this type + /// \param[out] out created kernel static Status Make(const std::shared_ptr& value_type, const std::shared_ptr& index_type, std::unique_ptr* out); + /// \brief single-array implementation virtual Status Take(FunctionContext* ctx, const Array& values, const Array& indices, std::shared_ptr* out) = 0; From 0fe81648e6ed254c21fa9665656cea5f5ad03b90 Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Tue, 25 Jun 2019 10:29:25 -0400 Subject: [PATCH 11/18] added requested tests and ValidateArray calls --- cpp/src/arrow/array/builder_primitive.cc | 4 +- cpp/src/arrow/buffer-builder.h | 3 ++ cpp/src/arrow/compute/kernels/filter-test.cc | 47 +++++++++++++++++- cpp/src/arrow/compute/kernels/filter.cc | 3 ++ cpp/src/arrow/compute/kernels/take-internal.h | 7 ++- cpp/src/arrow/compute/kernels/take-test.cc | 48 ++++++++++++++++++- 6 files changed, 102 insertions(+), 10 deletions(-) diff --git a/cpp/src/arrow/array/builder_primitive.cc b/cpp/src/arrow/array/builder_primitive.cc index 3c899c068cb..bad5b00591a 100644 --- a/cpp/src/arrow/array/builder_primitive.cc +++ b/cpp/src/arrow/array/builder_primitive.cc @@ -65,9 +65,9 @@ Status BooleanBuilder::Resize(int64_t capacity) { } Status BooleanBuilder::FinishInternal(std::shared_ptr* out) { - std::shared_ptr data, null_bitmap; - RETURN_NOT_OK(data_builder_.Finish(&data)); + std::shared_ptr null_bitmap, data; RETURN_NOT_OK(null_bitmap_builder_.Finish(&null_bitmap)); + RETURN_NOT_OK(data_builder_.Finish(&data)); *out = ArrayData::Make(boolean(), length_, {null_bitmap, data}, null_count_); diff --git a/cpp/src/arrow/buffer-builder.h b/cpp/src/arrow/buffer-builder.h index f069ea4d7dd..85f36ee3f5a 100644 --- a/cpp/src/arrow/buffer-builder.h +++ b/cpp/src/arrow/buffer-builder.h @@ -145,6 +145,9 @@ class ARROW_EXPORT BufferBuilder { ARROW_RETURN_NOT_OK(Resize(size_, shrink_to_fit)); if (size_ != 0) buffer_->ZeroPadding(); *out = buffer_; + if (*out == NULLPTR) { + ARROW_RETURN_NOT_OK(AllocateBuffer(pool_, 0, out)); + } Reset(); return Status::OK(); } diff --git a/cpp/src/arrow/compute/kernels/filter-test.cc b/cpp/src/arrow/compute/kernels/filter-test.cc index f88b23b959c..033efee5f68 100644 --- a/cpp/src/arrow/compute/kernels/filter-test.cc +++ b/cpp/src/arrow/compute/kernels/filter-test.cc @@ -44,23 +44,29 @@ class TestFilterKernel : public ComputeFixture, public TestBase { const std::shared_ptr& expected) { std::shared_ptr actual; ASSERT_OK(arrow::compute::Filter(&this->ctx_, *values, *filter, &actual)); + ASSERT_OK(ValidateArray(*actual)); AssertArraysEqual(*expected, *actual); } + void AssertFilter(const std::shared_ptr& type, const std::string& values, const std::string& filter, const std::string& expected) { std::shared_ptr actual; ASSERT_OK(this->Filter(type, values, filter, &actual)); + ASSERT_OK(ValidateArray(*actual)); AssertArraysEqual(*ArrayFromJSON(type, expected), *actual); } + Status Filter(const std::shared_ptr& type, const std::string& values, const std::string& filter, std::shared_ptr* out) { return arrow::compute::Filter(&this->ctx_, *ArrayFromJSON(type, values), *ArrayFromJSON(boolean(), filter), out); } + void ValidateFilter(const std::shared_ptr& values, const std::shared_ptr& filter_boxed) { std::shared_ptr filtered; ASSERT_OK(arrow::compute::Filter(&this->ctx_, *values, *filter_boxed, &filtered)); + ASSERT_OK(ValidateArray(*filtered)); auto filter = checked_pointer_cast(filter_boxed); int64_t values_i = 0, filtered_i = 0; @@ -86,11 +92,13 @@ class TestFilterKernelWithNull : public TestFilterKernel { protected: void AssertFilter(const std::string& values, const std::string& filter, const std::string& expected) { - TestFilterKernel::AssertFilter(utf8(), values, filter, expected); + TestFilterKernel::AssertFilter(null(), values, filter, expected); } }; TEST_F(TestFilterKernelWithNull, FilterNull) { + this->AssertFilter("[]", "[]", "[]"); + this->AssertFilter("[null, null, null]", "[0, 1, 0]", "[null]"); this->AssertFilter("[null, null, null]", "[1, 1, 0]", "[null, null]"); } @@ -104,6 +112,8 @@ class TestFilterKernelWithBoolean : public TestFilterKernel { }; TEST_F(TestFilterKernelWithBoolean, FilterBoolean) { + this->AssertFilter("[]", "[]", "[]"); + this->AssertFilter("[true, false, true]", "[0, 1, 0]", "[false]"); this->AssertFilter("[null, false, true]", "[0, 1, 0]", "[false]"); this->AssertFilter("[true, false, true]", "[null, 1, 0]", "[null, false]"); @@ -116,6 +126,7 @@ class TestFilterKernelWithNumeric : public TestFilterKernel { const std::string& expected) { TestFilterKernel::AssertFilter(type_singleton(), values, filter, expected); } + std::shared_ptr type_singleton() { return TypeTraits::type_singleton(); } @@ -137,13 +148,16 @@ TYPED_TEST(TestFilterKernelWithNumeric, FilterNumeric) { this->AssertFilter("[null, 8, 9]", "[0, 1, 0]", "[8]"); this->AssertFilter("[7, 8, 9]", "[null, 1, 0]", "[null, 8]"); this->AssertFilter("[7, 8, 9]", "[1, null, 1]", "[7, null, 9]"); + + std::shared_ptr arr; + ASSERT_RAISES(Invalid, this->Filter(this->type_singleton(), "[7, 8, 9]", "[]", &arr)); } TYPED_TEST(TestFilterKernelWithNumeric, FilterRandomNumeric) { auto rand = random::RandomArrayGenerator(kSeed); for (size_t i = 3; i < 13; i++) { const int64_t length = static_cast(1ULL << i); - for (auto null_probability : {0.0, 0.01, 0.1, 0.25, 0.5, 1.0}) { + for (auto null_probability : {0.0, 0.01, 0.25, 1.0}) { for (auto filter_probability : {0.0, 0.01, 0.1, 0.25, 0.5, 1.0}) { auto values = rand.Numeric(length, 0, 127, null_probability); auto filter = rand.Boolean(length, filter_probability, null_probability); @@ -208,6 +222,7 @@ TYPED_TEST(TestFilterKernelWithNumeric, CompareScalarAndFilterRandomNumeric) { &selection)); ASSERT_OK(arrow::compute::Filter(&this->ctx_, Datum(array), selection, &filtered)); auto filtered_array = filtered.make_array(); + ASSERT_OK(ValidateArray(*filtered_array)); auto expected = CompareAndFilter(array->raw_values(), array->length(), c_fifty, op); ASSERT_ARRAYS_EQUAL(*filtered_array, *expected); @@ -232,6 +247,7 @@ TYPED_TEST(TestFilterKernelWithNumeric, CompareArrayAndFilterRandomNumeric) { &selection)); ASSERT_OK(arrow::compute::Filter(&this->ctx_, Datum(lhs), selection, &filtered)); auto filtered_array = filtered.make_array(); + ASSERT_OK(ValidateArray(*filtered_array)); auto expected = CompareAndFilter(lhs->raw_values(), lhs->length(), rhs->raw_values(), op); ASSERT_ARRAYS_EQUAL(*filtered_array, *expected); @@ -261,6 +277,7 @@ TYPED_TEST(TestFilterKernelWithNumeric, ScalarInRangeAndFilterRandomNumeric) { &selection)); ASSERT_OK(arrow::compute::Filter(&this->ctx_, Datum(array), selection, &filtered)); auto filtered_array = filtered.make_array(); + ASSERT_OK(ValidateArray(*filtered_array)); auto expected = CompareAndFilter( array->raw_values(), array->length(), [&](CType e) { return (e > c_fifty) && (e < c_hundred); }); @@ -315,6 +332,32 @@ TEST_F(TestFilterKernelWithList, FilterListInt32) { this->AssertFilter(list(int32()), list_json, "[0, 1, 0, 1]", "[[1,2], [3]]"); } +TEST_F(TestFilterKernelWithList, FilterListListInt32) { + std::string list_json = R"([ + [], + [[1], [2, null, 2], []], + null, + [[3, null], null] + ])"; + auto type = list(list(int32())); + this->AssertFilter(type, list_json, "[0, 0, 0, 0]", "[]"); + this->AssertFilter(type, list_json, "[0, 1, 1, null]", R"([ + [[1], [2, null, 2], []], + null, + null + ])"); + this->AssertFilter(type, list_json, "[0, 0, 1, null]", "[null, null]"); + this->AssertFilter(type, list_json, "[1, 0, 0, 1]", R"([ + [], + [[3, null], null] + ])"); + this->AssertFilter(type, list_json, "[1, 1, 1, 1]", list_json); + this->AssertFilter(type, list_json, "[0, 1, 0, 1]", R"([ + [[1], [2, null, 2], []], + [[3, null], null] + ])"); +} + class TestFilterKernelWithFixedSizeList : public TestFilterKernel {}; TEST_F(TestFilterKernelWithFixedSizeList, FilterFixedSizeListInt32) { diff --git a/cpp/src/arrow/compute/kernels/filter.cc b/cpp/src/arrow/compute/kernels/filter.cc index 60cd393e5dd..d55088a519b 100644 --- a/cpp/src/arrow/compute/kernels/filter.cc +++ b/cpp/src/arrow/compute/kernels/filter.cc @@ -83,6 +83,9 @@ class FilterKernelImpl : public FilterKernel { Status Filter(FunctionContext* ctx, const Array& values, const BooleanArray& filter, int64_t length, std::shared_ptr* out) override { + if (values.length() != filter.length()) { + return Status::Invalid("filter and value array must have identical lengths"); + } RETURN_NOT_OK(taker_->Init(ctx->memory_pool())); RETURN_NOT_OK(taker_->Take(values, FilterIndexSequence(filter, length))); return taker_->Finish(out); diff --git a/cpp/src/arrow/compute/kernels/take-internal.h b/cpp/src/arrow/compute/kernels/take-internal.h index 835803e99a7..e5bbf30473c 100644 --- a/cpp/src/arrow/compute/kernels/take-internal.h +++ b/cpp/src/arrow/compute/kernels/take-internal.h @@ -270,6 +270,7 @@ class TakerImpl : public Taker { Status Init(MemoryPool* pool) override { null_bitmap_builder_.reset(new TypedBufferBuilder(pool)); offset_builder_.reset(new TypedBufferBuilder(pool)); + RETURN_NOT_OK(offset_builder_->Append(0)); return value_taker_->Init(pool); } @@ -279,11 +280,9 @@ class TakerImpl : public Taker { const auto& list_array = checked_cast(values); RETURN_NOT_OK(null_bitmap_builder_->Reserve(indices.length())); - RETURN_NOT_OK(offset_builder_->Reserve(indices.length() + 1)); - - int32_t offset = 0; - offset_builder_->UnsafeAppend(offset); + RETURN_NOT_OK(offset_builder_->Reserve(indices.length())); + int32_t offset = offset_builder_->data()[offset_builder_->length() - 1]; return VisitIndices(indices, values, [&](int64_t index, bool is_valid) { null_bitmap_builder_->UnsafeAppend(is_valid); diff --git a/cpp/src/arrow/compute/kernels/take-test.cc b/cpp/src/arrow/compute/kernels/take-test.cc index e4fd09dc1a3..8626d6a242e 100644 --- a/cpp/src/arrow/compute/kernels/take-test.cc +++ b/cpp/src/arrow/compute/kernels/take-test.cc @@ -44,17 +44,21 @@ class TestTakeKernel : public ComputeFixture, public TestBase { std::shared_ptr actual; TakeOptions options; ASSERT_OK(arrow::compute::Take(&this->ctx_, *values, *indices, options, &actual)); + ASSERT_OK(ValidateArray(*actual)); AssertArraysEqual(*expected, *actual); } + void AssertTake(const std::shared_ptr& type, const std::string& values, const std::string& indices, const std::string& expected) { std::shared_ptr actual; for (auto index_type : {int8(), uint32()}) { ASSERT_OK(this->Take(type, values, index_type, indices, &actual)); + ASSERT_OK(ValidateArray(*actual)); AssertArraysEqual(*ArrayFromJSON(type, expected), *actual); } } + Status Take(const std::shared_ptr& type, const std::string& values, const std::shared_ptr& index_type, const std::string& indices, std::shared_ptr* out) { @@ -68,7 +72,7 @@ class TestTakeKernelWithNull : public TestTakeKernel { protected: void AssertTake(const std::string& values, const std::string& indices, const std::string& expected) { - TestTakeKernel::AssertTake(utf8(), values, indices, expected); + TestTakeKernel::AssertTake(null(), values, indices, expected); } }; @@ -78,6 +82,8 @@ TEST_F(TestTakeKernelWithNull, TakeNull) { std::shared_ptr arr; ASSERT_RAISES(IndexError, this->Take(null(), "[null, null, null]", int8(), "[0, 9, 0]", &arr)); + ASSERT_RAISES(IndexError, + this->Take(boolean(), "[null, null, null]", int8(), "[0, -1, 0]", &arr)); } TEST_F(TestTakeKernelWithNull, InvalidIndexType) { @@ -95,6 +101,7 @@ class TestTakeKernelWithBoolean : public TestTakeKernel { }; TEST_F(TestTakeKernelWithBoolean, TakeBoolean) { + this->AssertTake("[7, 8, 9]", "[]", "[]"); this->AssertTake("[true, false, true]", "[0, 1, 0]", "[true, false, true]"); this->AssertTake("[null, false, true]", "[0, 1, 0]", "[null, false, null]"); this->AssertTake("[true, false, true]", "[null, 1, 0]", "[null, false, true]"); @@ -102,6 +109,8 @@ TEST_F(TestTakeKernelWithBoolean, TakeBoolean) { std::shared_ptr arr; ASSERT_RAISES(IndexError, this->Take(boolean(), "[true, false, true]", int8(), "[0, 9, 0]", &arr)); + ASSERT_RAISES(IndexError, + this->Take(boolean(), "[true, false, true]", int8(), "[0, -1, 0]", &arr)); } template @@ -111,15 +120,18 @@ class TestTakeKernelWithNumeric : public TestTakeKernel { const std::string& expected) { TestTakeKernel::AssertTake(type_singleton(), values, indices, expected); } + std::shared_ptr type_singleton() { return TypeTraits::type_singleton(); } + void ValidateTake(const std::shared_ptr& values, const std::shared_ptr& indices_boxed) { std::shared_ptr taken; TakeOptions options; ASSERT_OK( arrow::compute::Take(&this->ctx_, *values, *indices_boxed, options, &taken)); + ASSERT_OK(ValidateArray(*taken)); ASSERT_EQ(indices_boxed->length(), taken->length()); ASSERT_EQ(indices_boxed->type_id(), Type::INT32); @@ -137,14 +149,18 @@ class TestTakeKernelWithNumeric : public TestTakeKernel { TYPED_TEST_CASE(TestTakeKernelWithNumeric, NumericArrowTypes); TYPED_TEST(TestTakeKernelWithNumeric, TakeNumeric) { + this->AssertTake("[7, 8, 9]", "[]", "[]"); this->AssertTake("[7, 8, 9]", "[0, 1, 0]", "[7, 8, 7]"); this->AssertTake("[null, 8, 9]", "[0, 1, 0]", "[null, 8, null]"); this->AssertTake("[7, 8, 9]", "[null, 1, 0]", "[null, 8, 7]"); this->AssertTake("[null, 8, 9]", "[]", "[]"); + this->AssertTake("[7, 8, 9]", "[0, 0, 0, 0, 0, 0, 2]", "[7, 7, 7, 7, 7, 7, 9]"); std::shared_ptr arr; ASSERT_RAISES(IndexError, this->Take(this->type_singleton(), "[7, 8, 9]", int8(), "[0, 9, 0]", &arr)); + ASSERT_RAISES(IndexError, this->Take(this->type_singleton(), "[7, 8, 9]", int8(), + "[0, -1, 0]", &arr)); } TYPED_TEST(TestTakeKernelWithNumeric, TakeRandomNumeric) { @@ -153,7 +169,7 @@ TYPED_TEST(TestTakeKernelWithNumeric, TakeRandomNumeric) { const int64_t length = static_cast(1ULL << i); for (size_t j = 0; j < 13; j++) { const int64_t indices_length = static_cast(1ULL << j); - for (auto null_probability : {0.0, 0.01, 0.1, 0.25, 0.5, 1.0}) { + for (auto null_probability : {0.0, 0.01, 0.25, 1.0}) { auto values = rand.Numeric(length, 0, 127, null_probability); auto max_index = static_cast(length - 1); auto filter = rand.Int32(indices_length, 0, max_index, null_probability); @@ -218,6 +234,33 @@ TEST_F(TestTakeKernelWithList, TakeListInt32) { "[[], [], [], [], [], [], [1, 2]]"); } +TEST_F(TestTakeKernelWithList, TakeListListInt32) { + std::string list_json = R"([ + [], + [[1], [2, null, 2], []], + null, + [[3, null], null] + ])"; + auto type = list(list(int32())); + this->AssertTake(type, list_json, "[]", "[]"); + this->AssertTake(type, list_json, "[3, 2, 1]", R"([ + [[3, null], null], + null, + [[1], [2, null, 2], []] + ])"); + this->AssertTake(type, list_json, "[null, 3, 0]", R"([ + null, + [[3, null], null], + [] + ])"); + this->AssertTake(type, list_json, "[null, null]", "[null, null]"); + this->AssertTake(type, list_json, "[3, 0, 0, 3]", + "[[[3, null], null], [], [], [[3, null], null]]"); + this->AssertTake(type, list_json, "[0, 1, 2, 3]", list_json); + this->AssertTake(type, list_json, "[0, 0, 0, 0, 0, 0, 1]", + "[[], [], [], [], [], [], [[1], [2, null, 2], []]]"); +} + class TestTakeKernelWithFixedSizeList : public TestTakeKernel {}; TEST_F(TestTakeKernelWithFixedSizeList, TakeFixedSizeListInt32) { @@ -313,6 +356,7 @@ class TestPermutationsWithTake : public ComputeFixture, public TestBase { TakeOptions options; std::shared_ptr boxed_out; ASSERT_OK(arrow::compute::Take(&this->ctx_, values, indices, options, &boxed_out)); + ASSERT_OK(ValidateArray(*boxed_out)); *out = checked_pointer_cast(std::move(boxed_out)); } From ac0e391aaee544f819bddb983bf4ca6789cd10f3 Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Tue, 25 Jun 2019 10:49:07 -0400 Subject: [PATCH 12/18] add benchmark for filtering a StringArray --- .../arrow/compute/kernels/filter-benchmark.cc | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/cpp/src/arrow/compute/kernels/filter-benchmark.cc b/cpp/src/arrow/compute/kernels/filter-benchmark.cc index 3eb460adc02..ab9271ec159 100644 --- a/cpp/src/arrow/compute/kernels/filter-benchmark.cc +++ b/cpp/src/arrow/compute/kernels/filter-benchmark.cc @@ -68,6 +68,24 @@ static void FilterFixedSizeList1Int64(benchmark::State& state) { } } +static void FilterString(benchmark::State& state) { + RegressionArgs args(state); + + const int64_t array_size = args.size / sizeof(int64_t); + auto rand = random::RandomArrayGenerator(kSeed); + auto array = std::static_pointer_cast( + rand.String(array_size, 0, 128, args.null_proportion)); + auto filter = std::static_pointer_cast( + rand.Boolean(array_size, 0.75, args.null_proportion)); + + FunctionContext ctx; + for (auto _ : state) { + Datum out; + ABORT_NOT_OK(Filter(&ctx, Datum(array), Datum(filter), &out)); + benchmark::DoNotOptimize(out); + } +} + BENCHMARK(FilterInt64) ->Apply(RegressionSetArgs) ->Args({1 << 20, 1}) @@ -82,5 +100,12 @@ BENCHMARK(FilterFixedSizeList1Int64) ->MinTime(1.0) ->Unit(benchmark::TimeUnit::kNanosecond); +BENCHMARK(FilterString) + ->Apply(RegressionSetArgs) + ->Args({1 << 20, 1}) + ->Args({1 << 23, 1}) + ->MinTime(1.0) + ->Unit(benchmark::TimeUnit::kNanosecond); + } // namespace compute } // namespace arrow From e73c1ec23cd38ab7b5794dd0ea00586d31055152 Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Tue, 25 Jun 2019 11:38:51 -0400 Subject: [PATCH 13/18] validate arrays in pyarrow's Take() test --- python/pyarrow/tests/test_compute.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/python/pyarrow/tests/test_compute.py b/python/pyarrow/tests/test_compute.py index e356b9d9c9a..37da62c4815 100644 --- a/python/pyarrow/tests/test_compute.py +++ b/python/pyarrow/tests/test_compute.py @@ -61,12 +61,14 @@ def test_take(ty, values): for indices_type in [pa.uint8(), pa.int64()]: indices = pa.array([0, 4, 2, None], type=indices_type) result = arr.take(indices) + result.validate() expected = pa.array([values[0], values[4], values[2], None], type=ty) assert result.equals(expected) # empty indices indices = pa.array([], type=indices_type) result = arr.take(indices) + result.validate() expected = pa.array([], type=ty) assert result.equals(expected) @@ -86,6 +88,7 @@ def test_take_indices_types(): 'uint32', 'int32', 'uint64', 'int64']: indices = pa.array([0, 4, 2, None], type=indices_type) result = arr.take(indices) + result.validate() expected = pa.array([0, 4, 2, None]) assert result.equals(expected) @@ -100,6 +103,7 @@ def test_take_dictionary(ordered): arr = pa.DictionaryArray.from_arrays([0, 1, 2, 0, 1, 2], ['a', 'b', 'c'], ordered=ordered) result = arr.take(pa.array([0, 1, 3])) + result.validate() assert result.to_pylist() == ['a', 'b', 'a'] assert result.dictionary.to_pylist() == ['a', 'b', 'c'] assert result.type.ordered is ordered From 30d5872529a259c7da45d124494488f331576346 Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Tue, 25 Jun 2019 11:48:50 -0400 Subject: [PATCH 14/18] add LiteralType constructor for gcc 4.8 --- cpp/src/arrow/compute/kernels/filter.cc | 4 +++- cpp/src/arrow/compute/kernels/take-internal.h | 5 +++++ cpp/src/arrow/compute/kernels/take.cc | 4 +++- 3 files changed, 11 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/filter.cc b/cpp/src/arrow/compute/kernels/filter.cc index d55088a519b..8a07663d5dc 100644 --- a/cpp/src/arrow/compute/kernels/filter.cc +++ b/cpp/src/arrow/compute/kernels/filter.cc @@ -41,6 +41,8 @@ class FilterIndexSequence { constexpr bool never_out_of_bounds() const { return true; } void set_never_out_of_bounds() {} + constexpr FilterIndexSequence() = default; + FilterIndexSequence(const BooleanArray& filter, int64_t out_length) : filter_(&filter), out_length_(out_length) {} @@ -58,7 +60,7 @@ class FilterIndexSequence { int64_t null_count() const { return filter_->null_count(); } private: - const BooleanArray* filter_; + const BooleanArray* filter_ = nullptr; int64_t index_ = 0, out_length_ = -1; }; diff --git a/cpp/src/arrow/compute/kernels/take-internal.h b/cpp/src/arrow/compute/kernels/take-internal.h index e5bbf30473c..bacd71b587a 100644 --- a/cpp/src/arrow/compute/kernels/take-internal.h +++ b/cpp/src/arrow/compute/kernels/take-internal.h @@ -137,6 +137,9 @@ class Taker { // factory; the output Taker will support gathering values of the given type static Status Make(const std::shared_ptr& type, std::unique_ptr* out); + static_assert(std::is_literal_type::value, + "Index sequences must be literal type"); + static_assert(std::is_copy_constructible::value, "Index sequences must be copy constructible"); @@ -181,6 +184,8 @@ class RangeIndexSequence { constexpr bool never_out_of_bounds() const { return true; } void set_never_out_of_bounds() {} + constexpr RangeIndexSequence() = default; + RangeIndexSequence(bool is_valid, int64_t offset, int64_t length) : is_valid_(is_valid), index_(offset), length_(length) {} diff --git a/cpp/src/arrow/compute/kernels/take.cc b/cpp/src/arrow/compute/kernels/take.cc index ede8e5f9e7d..6ed9111559b 100644 --- a/cpp/src/arrow/compute/kernels/take.cc +++ b/cpp/src/arrow/compute/kernels/take.cc @@ -38,6 +38,8 @@ class ArrayIndexSequence { bool never_out_of_bounds() const { return never_out_of_bounds_; } void set_never_out_of_bounds() { never_out_of_bounds_ = true; } + constexpr ArrayIndexSequence() = default; + explicit ArrayIndexSequence(const Array& indices) : indices_(&checked_cast&>(indices)) {} @@ -54,7 +56,7 @@ class ArrayIndexSequence { int64_t null_count() const { return indices_->null_count(); } private: - const NumericArray* indices_; + const NumericArray* indices_ = nullptr; int64_t index_ = 0; bool never_out_of_bounds_ = false; }; From d60ff7c0dc55f90cd892034f80e2886061d52c1e Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Wed, 26 Jun 2019 11:26:41 -0400 Subject: [PATCH 15/18] cast size_t -> int16_t, update fixed_size_binary(0) test --- cpp/src/arrow/array-test.cc | 2 +- cpp/src/arrow/compute/kernels/take-test.cc | 9 +++++---- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/cpp/src/arrow/array-test.cc b/cpp/src/arrow/array-test.cc index 606ca7134b4..2005a0db562 100644 --- a/cpp/src/arrow/array-test.cc +++ b/cpp/src/arrow/array-test.cc @@ -1311,7 +1311,7 @@ TEST_F(TestFWBinaryArray, ZeroSize) { const auto& fw_array = checked_cast(*array); // data is never allocated - ASSERT_TRUE(fw_array.values() == nullptr); + ASSERT_EQ(fw_array.values()->size(), 0); ASSERT_EQ(0, fw_array.byte_width()); ASSERT_EQ(6, array->length()); diff --git a/cpp/src/arrow/compute/kernels/take-test.cc b/cpp/src/arrow/compute/kernels/take-test.cc index 8626d6a242e..da5e0c009ef 100644 --- a/cpp/src/arrow/compute/kernels/take-test.cc +++ b/cpp/src/arrow/compute/kernels/take-test.cc @@ -412,16 +412,17 @@ class TestPermutationsWithTake : public ComputeFixture, public TestBase { } std::shared_ptr Inverse(const std::shared_ptr& permutation) { - std::vector cycle_lengths(permutation->length() + 1, false); + auto length = static_cast(permutation->length()); + + std::vector cycle_lengths(length + 1, false); auto permutation_to_the_i = permutation; - for (int16_t cycle_length = 1; cycle_length <= permutation->length(); - ++cycle_length) { + for (int16_t cycle_length = 1; cycle_length <= length; ++cycle_length) { cycle_lengths[cycle_length] = HasTrivialCycle(*permutation_to_the_i); permutation_to_the_i = Take(*permutation, *permutation_to_the_i); } uint64_t cycle_to_identity_length = 1; - for (int16_t cycle_length = permutation->length(); cycle_length > 1; --cycle_length) { + for (int16_t cycle_length = length; cycle_length > 1; --cycle_length) { if (!cycle_lengths[cycle_length]) { continue; } From 5981ee8d4763d9f193b0fb35b3d50f817b5f0501 Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Wed, 26 Jun 2019 11:40:22 -0400 Subject: [PATCH 16/18] rewrite Filter(string array) benchmark to respect memory budget --- cpp/src/arrow/compute/kernels/filter-benchmark.cc | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/filter-benchmark.cc b/cpp/src/arrow/compute/kernels/filter-benchmark.cc index ab9271ec159..0ae528b6e54 100644 --- a/cpp/src/arrow/compute/kernels/filter-benchmark.cc +++ b/cpp/src/arrow/compute/kernels/filter-benchmark.cc @@ -71,10 +71,16 @@ static void FilterFixedSizeList1Int64(benchmark::State& state) { static void FilterString(benchmark::State& state) { RegressionArgs args(state); - const int64_t array_size = args.size / sizeof(int64_t); + int32_t string_min_length = 0, string_max_length = 128; + int32_t string_mean_length = (string_max_length + string_min_length) / 2; + // for an array of 50% null strings, we need to generate twice as many strings + // to ensure that they have an average of args.size total characters + auto array_size = + static_cast(args.size / string_mean_length / (1 - args.null_proportion)); + auto rand = random::RandomArrayGenerator(kSeed); - auto array = std::static_pointer_cast( - rand.String(array_size, 0, 128, args.null_proportion)); + auto array = std::static_pointer_cast(rand.String( + array_size, string_min_length, string_max_length, args.null_proportion)); auto filter = std::static_pointer_cast( rand.Boolean(array_size, 0.75, args.null_proportion)); From eaf8302ea2bbfd05f7a7396c44f9b1260c90dba0 Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Wed, 26 Jun 2019 12:35:43 -0400 Subject: [PATCH 17/18] add benchmarks for Take() --- cpp/src/arrow/compute/kernels/CMakeLists.txt | 1 + .../arrow/compute/kernels/take-benchmark.cc | 148 ++++++++++++++++++ 2 files changed, 149 insertions(+) create mode 100644 cpp/src/arrow/compute/kernels/take-benchmark.cc diff --git a/cpp/src/arrow/compute/kernels/CMakeLists.txt b/cpp/src/arrow/compute/kernels/CMakeLists.txt index 1bbb5bc5f32..3d9da8bcc34 100644 --- a/cpp/src/arrow/compute/kernels/CMakeLists.txt +++ b/cpp/src/arrow/compute/kernels/CMakeLists.txt @@ -34,3 +34,4 @@ add_arrow_benchmark(compare-benchmark PREFIX "arrow-compute") add_arrow_test(take-test PREFIX "arrow-compute") add_arrow_test(filter-test PREFIX "arrow-compute") add_arrow_benchmark(filter-benchmark PREFIX "arrow-compute") +add_arrow_benchmark(take-benchmark PREFIX "arrow-compute") diff --git a/cpp/src/arrow/compute/kernels/take-benchmark.cc b/cpp/src/arrow/compute/kernels/take-benchmark.cc new file mode 100644 index 00000000000..d43593981f5 --- /dev/null +++ b/cpp/src/arrow/compute/kernels/take-benchmark.cc @@ -0,0 +1,148 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "benchmark/benchmark.h" + +#include "arrow/compute/kernels/take.h" + +#include "arrow/compute/benchmark-util.h" +#include "arrow/compute/test-util.h" +#include "arrow/testing/gtest_util.h" +#include "arrow/testing/random.h" + +namespace arrow { +namespace compute { + +constexpr auto kSeed = 0x0ff1ce; + +static void TakeBenchmark(benchmark::State& state, const std::shared_ptr& values, + const std::shared_ptr& indices) { + FunctionContext ctx; + TakeOptions options; + for (auto _ : state) { + Datum out; + ABORT_NOT_OK(Take(&ctx, Datum(values), Datum(indices), options, &out)); + benchmark::DoNotOptimize(out); + } +} + +static void TakeInt64(benchmark::State& state) { + RegressionArgs args(state); + + const int64_t array_size = args.size / sizeof(int64_t); + auto rand = random::RandomArrayGenerator(kSeed); + + auto values = rand.Int64(array_size, -100, 100, args.null_proportion); + + auto indices = rand.Int32(array_size, 0, array_size - 1, args.null_proportion); + + TakeBenchmark(state, values, indices); +} + +static void TakeFixedSizeList1Int64(benchmark::State& state) { + RegressionArgs args(state); + + const int64_t array_size = args.size / sizeof(int64_t); + auto rand = random::RandomArrayGenerator(kSeed); + + auto int_array = rand.Int64(array_size, -100, 100, args.null_proportion); + auto values = std::make_shared( + fixed_size_list(int64(), 1), array_size, int_array, int_array->null_bitmap(), + int_array->null_count()); + + auto indices = rand.Int32(array_size, 0, array_size - 1, args.null_proportion); + + TakeBenchmark(state, values, indices); +} + +static void TakeInt64VsFilter(benchmark::State& state) { + RegressionArgs args(state); + + const int64_t array_size = args.size / sizeof(int64_t); + auto rand = random::RandomArrayGenerator(kSeed); + + auto values = rand.Int64(array_size, -100, 100, args.null_proportion); + + auto filter = std::static_pointer_cast( + rand.Boolean(array_size, 0.75, args.null_proportion)); + + Int32Builder indices_builder; + ABORT_NOT_OK(indices_builder.Resize(array_size)); + + for (int64_t i = 0; i < array_size; ++i) { + if (filter->IsNull(i)) { + indices_builder.UnsafeAppendNull(); + } else if (filter->Value(i)) { + indices_builder.UnsafeAppend(static_cast(i)); + } + } + + std::shared_ptr indices; + ABORT_NOT_OK(indices_builder.Finish(&indices)); + TakeBenchmark(state, values, indices); +} + +static void TakeString(benchmark::State& state) { + RegressionArgs args(state); + + int32_t string_min_length = 0, string_max_length = 128; + int32_t string_mean_length = (string_max_length + string_min_length) / 2; + // for an array of 50% null strings, we need to generate twice as many strings + // to ensure that they have an average of args.size total characters + auto array_size = + static_cast(args.size / string_mean_length / (1 - args.null_proportion)); + + auto rand = random::RandomArrayGenerator(kSeed); + auto values = std::static_pointer_cast(rand.String( + array_size, string_min_length, string_max_length, args.null_proportion)); + + auto indices = rand.Int32(array_size, 0, array_size - 1, args.null_proportion); + + TakeBenchmark(state, values, indices); +} + +BENCHMARK(TakeInt64) + ->Apply(RegressionSetArgs) + ->Args({1 << 20, 1}) + ->Args({1 << 23, 1}) + ->MinTime(1.0) + ->Unit(benchmark::TimeUnit::kNanosecond); + +BENCHMARK(TakeFixedSizeList1Int64) + ->Apply(RegressionSetArgs) + ->Args({1 << 20, 1}) + ->Args({1 << 23, 1}) + ->MinTime(1.0) + ->Unit(benchmark::TimeUnit::kNanosecond); + +BENCHMARK(TakeInt64VsFilter) + ->Apply(RegressionSetArgs) + ->Args({1 << 20, 1}) + ->Args({1 << 23, 1}) + ->MinTime(1.0) + ->Unit(benchmark::TimeUnit::kNanosecond); + +BENCHMARK(TakeString) + ->Apply(RegressionSetArgs) + ->Args({1 << 20, 1}) + ->Args({1 << 23, 1}) + ->MinTime(1.0) + ->Unit(benchmark::TimeUnit::kNanosecond); + +} // namespace compute +} // namespace arrow + From 73262bd443575c83ce062e978420b0bc05bc9535 Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Wed, 26 Jun 2019 14:10:17 -0400 Subject: [PATCH 18/18] clang-format --- cpp/src/arrow/compute/kernels/take-benchmark.cc | 1 - 1 file changed, 1 deletion(-) diff --git a/cpp/src/arrow/compute/kernels/take-benchmark.cc b/cpp/src/arrow/compute/kernels/take-benchmark.cc index d43593981f5..139e183b92f 100644 --- a/cpp/src/arrow/compute/kernels/take-benchmark.cc +++ b/cpp/src/arrow/compute/kernels/take-benchmark.cc @@ -145,4 +145,3 @@ BENCHMARK(TakeString) } // namespace compute } // namespace arrow -