From 14968fd7b2c1ee75508aa3b5c60eca242fffaa41 Mon Sep 17 00:00:00 2001 From: Frank Du Date: Wed, 26 Aug 2020 00:54:32 +0000 Subject: [PATCH 1/8] Add random test for MinMax Signed-off-by: Frank Du --- cpp/src/arrow/compute/api_aggregate.h | 17 --- .../arrow/compute/kernels/aggregate_test.cc | 107 ++++++++++++++++++ 2 files changed, 107 insertions(+), 17 deletions(-) diff --git a/cpp/src/arrow/compute/api_aggregate.h b/cpp/src/arrow/compute/api_aggregate.h index 5ae3cf9b5fe..675bd25a684 100644 --- a/cpp/src/arrow/compute/api_aggregate.h +++ b/cpp/src/arrow/compute/api_aggregate.h @@ -130,23 +130,6 @@ Result MinMax(const Datum& value, const MinMaxOptions& options = MinMaxOptions::Defaults(), ExecContext* ctx = NULLPTR); -/// \brief Calculate the min / max of a numeric array. -/// -/// This function returns both the min and max as a collection. The resulting -/// datum thus consists of two scalar datums: {Datum(min), Datum(max)} -/// -/// \param[in] array input array -/// \param[in] options see MinMaxOptions for more information -/// \param[in] ctx the function execution context, optional -/// \return resulting datum containing a {min, max} collection -/// -/// \since 1.0.0 -/// \note API not yet finalized -ARROW_EXPORT -Result MinMax(const Array& array, - const MinMaxOptions& options = MinMaxOptions::Defaults(), - ExecContext* ctx = NULLPTR); - /// \brief Calculate the modal (most common) value of a numeric array /// /// This function returns both mode and count as a struct scalar, with type diff --git a/cpp/src/arrow/compute/kernels/aggregate_test.cc b/cpp/src/arrow/compute/kernels/aggregate_test.cc index 15cfcb42f1d..afa94c84d93 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_test.cc +++ b/cpp/src/arrow/compute/kernels/aggregate_test.cc @@ -594,6 +594,113 @@ TYPED_TEST(TestFloatingMinMaxKernel, DefaultOptions) { AssertDatumsEqual(explicit_defaults, no_options_provided); } +template +using MinMaxResult = std::pair; + +template +static enable_if_integer> NaiveMinMax( + const Array& array) { + using T = typename ArrowType::c_type; + using ArrayType = typename TypeTraits::ArrayType; + + const auto& array_numeric = reinterpret_cast(array); + const auto values = array_numeric.raw_values(); + + if (array.length() <= array.null_count()) { // All null values + return {static_cast(0), static_cast(0)}; + } + + T min = std::numeric_limits::max(); + T max = std::numeric_limits::min(); + if (array.null_count() != 0) { // Some values are null + internal::BitmapReader reader(array.null_bitmap_data(), array.offset(), + array.length()); + for (int64_t i = 0; i < array.length(); i++) { + if (reader.IsSet()) { + min = std::min(min, values[i]); + max = std::max(max, values[i]); + } + reader.Next(); + } + } else { // All true values + for (int64_t i = 0; i < array.length(); i++) { + min = std::min(min, values[i]); + max = std::max(max, values[i]); + } + } + + return {min, max}; +} + +template +static enable_if_floating_point> NaiveMinMax( + const Array& array) { + using T = typename ArrowType::c_type; + using ArrayType = typename TypeTraits::ArrayType; + + const auto& array_numeric = reinterpret_cast(array); + const auto values = array_numeric.raw_values(); + + if (array.length() <= array.null_count()) { // All null values + return {static_cast(0), static_cast(0)}; + } + + T min = std::numeric_limits::infinity(); + T max = -std::numeric_limits::infinity(); + if (array.null_count() != 0) { // Some values are null + internal::BitmapReader reader(array.null_bitmap_data(), array.offset(), + array.length()); + for (int64_t i = 0; i < array.length(); i++) { + if (reader.IsSet()) { + min = std::fmin(min, values[i]); + max = std::fmax(max, values[i]); + } + reader.Next(); + } + } else { // All true values + for (int64_t i = 0; i < array.length(); i++) { + min = std::fmin(min, values[i]); + max = std::fmax(max, values[i]); + } + } + + return {min, max}; +} + +template +void ValidateMinMax(const Array& array) { + using Traits = TypeTraits; + using ScalarType = typename Traits::ScalarType; + + ASSERT_OK_AND_ASSIGN(Datum out, MinMax(array)); + const StructScalar& value = out.scalar_as(); + + auto expected = NaiveMinMax(array); + const auto& out_min = checked_cast(*value.value[0]); + ASSERT_EQ(expected.first, out_min.value); + + const auto& out_max = checked_cast(*value.value[1]); + ASSERT_EQ(expected.second, out_max.value); +} + +template +class TestRandomNumericMinMaxKernel : public ::testing::Test {}; + +TYPED_TEST_SUITE(TestRandomNumericMinMaxKernel, NumericArrowTypes); +TYPED_TEST(TestRandomNumericMinMaxKernel, RandomArrayMinMax) { + auto rand = random::RandomArrayGenerator(0x8afc055); + // Test size up to 1<<13 (8192). + for (size_t i = 3; i < 14; i += 2) { + for (auto null_probability : {0.0, 0.001, 0.1, 0.5, 0.999, 1.0}) { + for (auto length_adjust : {-2, -1, 0, 1, 2}) { + int64_t length = (1UL << i) + length_adjust; + auto array = rand.Numeric(length, 0, 100, null_probability); + ValidateMinMax(*array); + } + } + } +} + // // Mode // From b81cabe037a4bd335a2799a38ed54b182c4c5377 Mon Sep 17 00:00:00 2001 From: Frank Du Date: Mon, 13 Jul 2020 07:25:36 +0000 Subject: [PATCH 2/8] Move min/max to header and rework with BitBlockCounter Signed-off-by: Frank Du --- .../arrow/compute/kernels/aggregate_basic.cc | 204 -------------- .../kernels/aggregate_basic_internal.h | 260 ++++++++++++++++++ 2 files changed, 260 insertions(+), 204 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/aggregate_basic.cc b/cpp/src/arrow/compute/kernels/aggregate_basic.cc index e2cc2a334d8..8c773dbbc9e 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_basic.cc +++ b/cpp/src/arrow/compute/kernels/aggregate_basic.cc @@ -15,8 +15,6 @@ // specific language governing permissions and limitations // under the License. -#include - #include "arrow/compute/api_aggregate.h" #include "arrow/compute/kernels/aggregate_basic_internal.h" #include "arrow/compute/kernels/aggregate_internal.h" @@ -132,208 +130,6 @@ std::unique_ptr MeanInit(KernelContext* ctx, const KernelInitArgs& // ---------------------------------------------------------------------- // MinMax implementation -template -struct MinMaxState {}; - -template -struct MinMaxState> { - using ThisType = MinMaxState; - using T = typename ArrowType::c_type; - - ThisType& operator+=(const ThisType& rhs) { - this->has_nulls |= rhs.has_nulls; - this->has_values |= rhs.has_values; - this->min = this->min && rhs.min; - this->max = this->max || rhs.max; - return *this; - } - - void MergeOne(T value) { - this->min = this->min && value; - this->max = this->max || value; - } - - T min = true; - T max = false; - bool has_nulls = false; - bool has_values = false; -}; - -template -struct MinMaxState> { - using ThisType = MinMaxState; - using T = typename ArrowType::c_type; - - ThisType& operator+=(const ThisType& rhs) { - this->has_nulls |= rhs.has_nulls; - this->has_values |= rhs.has_values; - this->min = std::min(this->min, rhs.min); - this->max = std::max(this->max, rhs.max); - return *this; - } - - void MergeOne(T value) { - this->min = std::min(this->min, value); - this->max = std::max(this->max, value); - } - - T min = std::numeric_limits::max(); - T max = std::numeric_limits::min(); - bool has_nulls = false; - bool has_values = false; -}; - -template -struct MinMaxState> { - using ThisType = MinMaxState; - using T = typename ArrowType::c_type; - - ThisType& operator+=(const ThisType& rhs) { - this->has_nulls |= rhs.has_nulls; - this->has_values |= rhs.has_values; - this->min = std::fmin(this->min, rhs.min); - this->max = std::fmax(this->max, rhs.max); - return *this; - } - - void MergeOne(T value) { - this->min = std::fmin(this->min, value); - this->max = std::fmax(this->max, value); - } - - T min = std::numeric_limits::infinity(); - T max = -std::numeric_limits::infinity(); - bool has_nulls = false; - bool has_values = false; -}; - -template -struct MinMaxImpl : public ScalarAggregator { - using ArrayType = typename TypeTraits::ArrayType; - using ThisType = MinMaxImpl; - using StateType = MinMaxState; - - MinMaxImpl(const std::shared_ptr& out_type, const MinMaxOptions& options) - : out_type(out_type), options(options) {} - - void Consume(KernelContext*, const ExecBatch& batch) override { - StateType local; - - ArrayType arr(batch[0].array()); - - const auto null_count = arr.null_count(); - local.has_nulls = null_count > 0; - local.has_values = (arr.length() - null_count) > 0; - - if (local.has_nulls && options.null_handling == MinMaxOptions::OUTPUT_NULL) { - this->state = local; - return; - } - - if (local.has_nulls) { - BitmapReader reader(arr.null_bitmap_data(), arr.offset(), arr.length()); - for (int64_t i = 0; i < arr.length(); i++) { - if (reader.IsSet()) { - local.MergeOne(arr.Value(i)); - } - reader.Next(); - } - } else { - for (int64_t i = 0; i < arr.length(); i++) { - local.MergeOne(arr.Value(i)); - } - } - this->state = local; - } - - void MergeFrom(KernelContext*, const KernelState& src) override { - const auto& other = checked_cast(src); - this->state += other.state; - } - - void Finalize(KernelContext*, Datum* out) override { - using ScalarType = typename TypeTraits::ScalarType; - - std::vector> values; - if (!state.has_values || - (state.has_nulls && options.null_handling == MinMaxOptions::OUTPUT_NULL)) { - // (null, null) - values = {std::make_shared(), std::make_shared()}; - } else { - values = {std::make_shared(state.min), - std::make_shared(state.max)}; - } - out->value = std::make_shared(std::move(values), this->out_type); - } - - std::shared_ptr out_type; - MinMaxOptions options; - MinMaxState state; -}; - -struct BooleanMinMaxImpl : public MinMaxImpl { - using MinMaxImpl::MinMaxImpl; - - void Consume(KernelContext*, const ExecBatch& batch) override { - StateType local; - ArrayType arr(batch[0].array()); - - const auto arr_length = arr.length(); - const auto null_count = arr.null_count(); - const auto valid_count = arr_length - null_count; - - local.has_nulls = null_count > 0; - local.has_values = valid_count > 0; - if (local.has_nulls && options.null_handling == MinMaxOptions::OUTPUT_NULL) { - this->state = local; - return; - } - - const auto true_count = arr.true_count(); - const auto false_count = valid_count - true_count; - local.max = true_count > 0; - local.min = false_count == 0; - - this->state = local; - } -}; - -struct MinMaxInitState { - std::unique_ptr state; - KernelContext* ctx; - const DataType& in_type; - const std::shared_ptr& out_type; - const MinMaxOptions& options; - - MinMaxInitState(KernelContext* ctx, const DataType& in_type, - const std::shared_ptr& out_type, const MinMaxOptions& options) - : ctx(ctx), in_type(in_type), out_type(out_type), options(options) {} - - Status Visit(const DataType&) { - return Status::NotImplemented("No min/max implemented"); - } - - Status Visit(const HalfFloatType&) { - return Status::NotImplemented("No sum implemented"); - } - - Status Visit(const BooleanType&) { - state.reset(new BooleanMinMaxImpl(out_type, options)); - return Status::OK(); - } - - template - enable_if_number Visit(const Type&) { - state.reset(new MinMaxImpl(out_type, options)); - return Status::OK(); - } - - std::unique_ptr Create() { - ctx->SetStatus(VisitTypeInline(in_type, this)); - return std::move(state); - } -}; - std::unique_ptr MinMaxInit(KernelContext* ctx, const KernelInitArgs& args) { MinMaxInitState visitor(ctx, *args.inputs[0].type, args.kernel->signature->out_type().type(), diff --git a/cpp/src/arrow/compute/kernels/aggregate_basic_internal.h b/cpp/src/arrow/compute/kernels/aggregate_basic_internal.h index 29db97381d6..fd7839a3b49 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_basic_internal.h +++ b/cpp/src/arrow/compute/kernels/aggregate_basic_internal.h @@ -17,6 +17,9 @@ #pragma once +#include + +#include "arrow/compute/api_aggregate.h" #include "arrow/compute/kernels/aggregate_internal.h" #include "arrow/compute/kernels/common.h" #include "arrow/util/align_util.h" @@ -312,6 +315,263 @@ struct SumLikeInit { } }; +// ---------------------------------------------------------------------- +// MinMax implementation + +template +struct MinMaxState {}; + +template +struct MinMaxState> { + using ThisType = MinMaxState; + using T = typename ArrowType::c_type; + + ThisType& operator+=(const ThisType& rhs) { + this->has_nulls |= rhs.has_nulls; + this->has_values |= rhs.has_values; + this->min = this->min && rhs.min; + this->max = this->max || rhs.max; + return *this; + } + + void MergeOne(T value) { + this->min = this->min && value; + this->max = this->max || value; + } + + T min = true; + T max = false; + bool has_nulls = false; + bool has_values = false; +}; + +template +struct MinMaxState> { + using ThisType = MinMaxState; + using T = typename ArrowType::c_type; + + ThisType& operator+=(const ThisType& rhs) { + this->has_nulls |= rhs.has_nulls; + this->has_values |= rhs.has_values; + this->min = std::min(this->min, rhs.min); + this->max = std::max(this->max, rhs.max); + return *this; + } + + void MergeOne(T value) { + this->min = std::min(this->min, value); + this->max = std::max(this->max, value); + } + + T min = std::numeric_limits::max(); + T max = std::numeric_limits::min(); + bool has_nulls = false; + bool has_values = false; +}; + +template +struct MinMaxState> { + using ThisType = MinMaxState; + using T = typename ArrowType::c_type; + + ThisType& operator+=(const ThisType& rhs) { + this->has_nulls |= rhs.has_nulls; + this->has_values |= rhs.has_values; + this->min = std::fmin(this->min, rhs.min); + this->max = std::fmax(this->max, rhs.max); + return *this; + } + + void MergeOne(T value) { + this->min = std::fmin(this->min, value); + this->max = std::fmax(this->max, value); + } + + T min = std::numeric_limits::infinity(); + T max = -std::numeric_limits::infinity(); + bool has_nulls = false; + bool has_values = false; +}; + +template +struct MinMaxImpl : public ScalarAggregator { + using ArrayType = typename TypeTraits::ArrayType; + using ThisType = MinMaxImpl; + using StateType = MinMaxState; + + MinMaxImpl(const std::shared_ptr& out_type, const MinMaxOptions& options) + : out_type(out_type), options(options) {} + + void Consume(KernelContext*, const ExecBatch& batch) override { + StateType local; + + ArrayType arr(batch[0].array()); + + const auto null_count = arr.null_count(); + local.has_nulls = null_count > 0; + local.has_values = (arr.length() - null_count) > 0; + + if (local.has_nulls && options.null_handling == MinMaxOptions::OUTPUT_NULL) { + this->state = local; + return; + } + + if (local.has_nulls) { + local += ConsumeWithNulls(arr); + } else { // All true values + for (int64_t i = 0; i < arr.length(); i++) { + local.MergeOne(arr.Value(i)); + } + } + this->state = local; + } + + void MergeFrom(KernelContext*, const KernelState& src) override { + const auto& other = checked_cast(src); + this->state += other.state; + } + + void Finalize(KernelContext*, Datum* out) override { + using ScalarType = typename TypeTraits::ScalarType; + + std::vector> values; + if (!state.has_values || + (state.has_nulls && options.null_handling == MinMaxOptions::OUTPUT_NULL)) { + // (null, null) + values = {std::make_shared(), std::make_shared()}; + } else { + values = {std::make_shared(state.min), + std::make_shared(state.max)}; + } + out->value = std::make_shared(std::move(values), this->out_type); + } + + std::shared_ptr out_type; + MinMaxOptions options; + MinMaxState state; + + private: + StateType ConsumeWithNulls(const ArrayType& arr) const { + StateType local; + const int64_t length = arr.length(); + int64_t offset = arr.offset(); + const uint8_t* bitmap = arr.null_bitmap_data(); + int64_t idx = 0; + + const auto p = arrow::internal::BitmapWordAlign<1>(bitmap, offset, length); + // First handle the leading bits + const int64_t leading_bits = p.leading_bits; + while (idx < leading_bits) { + if (BitUtil::GetBit(bitmap, offset)) { + local.MergeOne(arr.Value(idx)); + } + idx++; + offset++; + } + + // The aligned parts scanned with BitBlockCounter + arrow::internal::BitBlockCounter data_counter(bitmap, offset, length - leading_bits); + auto current_block = data_counter.NextWord(); + while (idx < length) { + if (current_block.AllSet()) { // All true values + int run_length = 0; + // Scan forward until a block that has some false values (or the end) + while (current_block.length > 0 && current_block.AllSet()) { + run_length += current_block.length; + current_block = data_counter.NextWord(); + } + for (int64_t i = 0; i < run_length; i++) { + local.MergeOne(arr.Value(idx + i)); + } + idx += run_length; + offset += run_length; + // The current_block already computed, advance to next loop + continue; + } else if (!current_block.NoneSet()) { // Some values are null + BitmapReader reader(arr.null_bitmap_data(), offset, current_block.length); + for (int64_t i = 0; i < current_block.length; i++) { + if (reader.IsSet()) { + local.MergeOne(arr.Value(idx + i)); + } + reader.Next(); + } + + idx += current_block.length; + offset += current_block.length; + } else { // All null values + idx += current_block.length; + offset += current_block.length; + } + current_block = data_counter.NextWord(); + } + + return local; + } +}; + +struct BooleanMinMaxImpl : public MinMaxImpl { + using MinMaxImpl::MinMaxImpl; + + void Consume(KernelContext*, const ExecBatch& batch) override { + StateType local; + ArrayType arr(batch[0].array()); + + const auto arr_length = arr.length(); + const auto null_count = arr.null_count(); + const auto valid_count = arr_length - null_count; + + local.has_nulls = null_count > 0; + local.has_values = valid_count > 0; + if (local.has_nulls && options.null_handling == MinMaxOptions::OUTPUT_NULL) { + this->state = local; + return; + } + + const auto true_count = arr.true_count(); + const auto false_count = valid_count - true_count; + local.max = true_count > 0; + local.min = false_count == 0; + + this->state = local; + } +}; + +struct MinMaxInitState { + std::unique_ptr state; + KernelContext* ctx; + const DataType& in_type; + const std::shared_ptr& out_type; + const MinMaxOptions& options; + + MinMaxInitState(KernelContext* ctx, const DataType& in_type, + const std::shared_ptr& out_type, const MinMaxOptions& options) + : ctx(ctx), in_type(in_type), out_type(out_type), options(options) {} + + Status Visit(const DataType&) { + return Status::NotImplemented("No min/max implemented"); + } + + Status Visit(const HalfFloatType&) { + return Status::NotImplemented("No min/max implemented"); + } + + Status Visit(const BooleanType&) { + state.reset(new BooleanMinMaxImpl(out_type, options)); + return Status::OK(); + } + + template + enable_if_number Visit(const Type&) { + state.reset(new MinMaxImpl(out_type, options)); + return Status::OK(); + } + + std::unique_ptr Create() { + ctx->SetStatus(VisitTypeInline(in_type, this)); + return std::move(state); + } +}; + } // namespace aggregate } // namespace compute } // namespace arrow From 27d3c592a71d7d303b302d9168c44d113d98aa3a Mon Sep 17 00:00:00 2001 From: Frank Du Date: Tue, 14 Jul 2020 06:39:44 +0000 Subject: [PATCH 3/8] Add avx version Signed-off-by: Frank Du --- cpp/src/arrow/CMakeLists.txt | 12 ++-- .../arrow/compute/kernels/aggregate_basic.cc | 21 ++++-- ...te_sum_avx2.cc => aggregate_basic_avx2.cc} | 17 +++++ ...um_avx512.cc => aggregate_basic_avx512.cc} | 17 +++++ .../kernels/aggregate_basic_internal.h | 68 +++++++++++-------- 5 files changed, 96 insertions(+), 39 deletions(-) rename cpp/src/arrow/compute/kernels/{aggregate_sum_avx2.cc => aggregate_basic_avx2.cc} (80%) rename cpp/src/arrow/compute/kernels/{aggregate_sum_avx512.cc => aggregate_basic_avx512.cc} (80%) diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index fdc0ed0e558..26e16b56c22 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -387,17 +387,17 @@ if(ARROW_COMPUTE) compute/kernels/vector_sort.cc) if(CXX_SUPPORTS_AVX2) - list(APPEND ARROW_SRCS compute/kernels/aggregate_sum_avx2.cc) - set_source_files_properties(compute/kernels/aggregate_sum_avx2.cc PROPERTIES + list(APPEND ARROW_SRCS compute/kernels/aggregate_basic_avx2.cc) + set_source_files_properties(compute/kernels/aggregate_basic_avx2.cc PROPERTIES SKIP_PRECOMPILE_HEADERS ON) - set_source_files_properties(compute/kernels/aggregate_sum_avx2.cc PROPERTIES + set_source_files_properties(compute/kernels/aggregate_basic_avx2.cc PROPERTIES COMPILE_FLAGS ${ARROW_AVX2_FLAG}) endif() if(CXX_SUPPORTS_AVX512) - list(APPEND ARROW_SRCS compute/kernels/aggregate_sum_avx512.cc) - set_source_files_properties(compute/kernels/aggregate_sum_avx512.cc PROPERTIES + list(APPEND ARROW_SRCS compute/kernels/aggregate_basic_avx512.cc) + set_source_files_properties(compute/kernels/aggregate_basic_avx512.cc PROPERTIES SKIP_PRECOMPILE_HEADERS ON) - set_source_files_properties(compute/kernels/aggregate_sum_avx512.cc PROPERTIES + set_source_files_properties(compute/kernels/aggregate_basic_avx512.cc PROPERTIES COMPILE_FLAGS ${ARROW_AVX512_FLAG}) endif() endif() diff --git a/cpp/src/arrow/compute/kernels/aggregate_basic.cc b/cpp/src/arrow/compute/kernels/aggregate_basic.cc index 8c773dbbc9e..33afd68bc44 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_basic.cc +++ b/cpp/src/arrow/compute/kernels/aggregate_basic.cc @@ -131,9 +131,9 @@ std::unique_ptr MeanInit(KernelContext* ctx, const KernelInitArgs& // MinMax implementation std::unique_ptr MinMaxInit(KernelContext* ctx, const KernelInitArgs& args) { - MinMaxInitState visitor(ctx, *args.inputs[0].type, - args.kernel->signature->out_type().type(), - static_cast(*args.options)); + MinMaxInitState visitor( + ctx, *args.inputs[0].type, args.kernel->signature->out_type().type(), + static_cast(*args.options)); return visitor.Create(); } @@ -159,8 +159,7 @@ void AddBasicAggKernels(KernelInit init, void AddMinMaxKernels(KernelInit init, const std::vector>& types, - ScalarAggregateFunction* func, - SimdLevel::type simd_level = SimdLevel::NONE) { + ScalarAggregateFunction* func, SimdLevel::type simd_level) { for (const auto& ty : types) { // array[T] -> scalar[struct] auto out_ty = struct_({field("min", ty), field("max", ty)}); @@ -227,6 +226,18 @@ void RegisterScalarAggregateBasic(FunctionRegistry* registry) { &default_minmax_options); aggregate::AddMinMaxKernels(aggregate::MinMaxInit, {boolean()}, func.get()); aggregate::AddMinMaxKernels(aggregate::MinMaxInit, NumericTypes(), func.get()); + // Add the SIMD variants for min max +#if defined(ARROW_HAVE_RUNTIME_AVX2) + if (cpu_info->IsSupported(arrow::internal::CpuInfo::AVX2)) { + aggregate::AddMinMaxAvx2AggKernels(func.get()); + } +#endif +#if defined(ARROW_HAVE_RUNTIME_AVX512) + if (cpu_info->IsSupported(arrow::internal::CpuInfo::AVX512)) { + aggregate::AddMinMaxAvx512AggKernels(func.get()); + } +#endif + DCHECK_OK(registry->AddFunction(std::move(func))); DCHECK_OK(registry->AddFunction(aggregate::AddModeAggKernels())); diff --git a/cpp/src/arrow/compute/kernels/aggregate_sum_avx2.cc b/cpp/src/arrow/compute/kernels/aggregate_basic_avx2.cc similarity index 80% rename from cpp/src/arrow/compute/kernels/aggregate_sum_avx2.cc rename to cpp/src/arrow/compute/kernels/aggregate_basic_avx2.cc index 2811c4cd865..e0c1118c714 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_sum_avx2.cc +++ b/cpp/src/arrow/compute/kernels/aggregate_basic_avx2.cc @@ -67,6 +67,17 @@ std::unique_ptr MeanInitAvx2(KernelContext* ctx, return visitor.Create(); } +// ---------------------------------------------------------------------- +// MinMax implementation + +std::unique_ptr MinMaxInitAvx2(KernelContext* ctx, + const KernelInitArgs& args) { + MinMaxInitState visitor( + ctx, *args.inputs[0].type, args.kernel->signature->out_type().type(), + static_cast(*args.options)); + return visitor.Create(); +} + void AddSumAvx2AggKernels(ScalarAggregateFunction* func) { AddBasicAggKernels(SumInitAvx2, internal::SignedIntTypes(), int64(), func, SimdLevel::AVX2); @@ -81,6 +92,12 @@ void AddMeanAvx2AggKernels(ScalarAggregateFunction* func) { SimdLevel::AVX2); } +void AddMinMaxAvx2AggKernels(ScalarAggregateFunction* func) { + // Enable int types for AVX2 variants. + // No auto vectorize for float/double as it use fmin/fmax which has NaN handling. + AddMinMaxKernels(MinMaxInitAvx2, internal::IntTypes(), func, SimdLevel::AVX2); +} + } // namespace aggregate } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/kernels/aggregate_sum_avx512.cc b/cpp/src/arrow/compute/kernels/aggregate_basic_avx512.cc similarity index 80% rename from cpp/src/arrow/compute/kernels/aggregate_sum_avx512.cc rename to cpp/src/arrow/compute/kernels/aggregate_basic_avx512.cc index 00408027e1f..c2c748d3af7 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_sum_avx512.cc +++ b/cpp/src/arrow/compute/kernels/aggregate_basic_avx512.cc @@ -68,6 +68,17 @@ std::unique_ptr MeanInitAvx512(KernelContext* ctx, return visitor.Create(); } +// ---------------------------------------------------------------------- +// MinMax implementation + +std::unique_ptr MinMaxInitAvx512(KernelContext* ctx, + const KernelInitArgs& args) { + MinMaxInitState visitor( + ctx, *args.inputs[0].type, args.kernel->signature->out_type().type(), + static_cast(*args.options)); + return visitor.Create(); +} + void AddSumAvx512AggKernels(ScalarAggregateFunction* func) { AddBasicAggKernels(SumInitAvx512, internal::SignedIntTypes(), int64(), func, SimdLevel::AVX512); @@ -82,6 +93,12 @@ void AddMeanAvx512AggKernels(ScalarAggregateFunction* func) { SimdLevel::AVX512); } +void AddMinMaxAvx512AggKernels(ScalarAggregateFunction* func) { + // Enable 32/64 int types for avx512 variants, no advantage on 8/16 int. + AddMinMaxKernels(MinMaxInitAvx512, {int32(), uint32(), int64(), uint64()}, func, + SimdLevel::AVX512); +} + } // namespace aggregate } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/kernels/aggregate_basic_internal.h b/cpp/src/arrow/compute/kernels/aggregate_basic_internal.h index fd7839a3b49..0d8d43d95d6 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_basic_internal.h +++ b/cpp/src/arrow/compute/kernels/aggregate_basic_internal.h @@ -44,22 +44,29 @@ void AddBasicAggKernels(KernelInit init, std::shared_ptr out_ty, ScalarAggregateFunction* func, SimdLevel::type simd_level = SimdLevel::NONE); +void AddMinMaxKernels(KernelInit init, + const std::vector>& types, + ScalarAggregateFunction* func, + SimdLevel::type simd_level = SimdLevel::NONE); + // SIMD variants for kernels void AddSumAvx2AggKernels(ScalarAggregateFunction* func); void AddMeanAvx2AggKernels(ScalarAggregateFunction* func); +void AddMinMaxAvx2AggKernels(ScalarAggregateFunction* func); void AddSumAvx512AggKernels(ScalarAggregateFunction* func); void AddMeanAvx512AggKernels(ScalarAggregateFunction* func); +void AddMinMaxAvx512AggKernels(ScalarAggregateFunction* func); std::shared_ptr AddModeAggKernels(); // ---------------------------------------------------------------------- // Sum implementation -template +template struct SumState { using SumType = typename FindAccumulatorType::Type; - using ThisType = SumState; + using ThisType = SumState; using T = typename TypeTraits::CType; using ArrayType = typename TypeTraits::ArrayType; @@ -220,10 +227,10 @@ struct SumState { } }; -template -struct SumState { +template +struct SumState { using SumType = typename FindAccumulatorType::Type; - using ThisType = SumState; + using ThisType = SumState; ThisType& operator+=(const ThisType& rhs) { this->count += rhs.count; @@ -242,10 +249,10 @@ struct SumState { typename SumType::c_type sum = 0; }; -template +template struct SumImpl : public ScalarAggregator { using ArrayType = typename TypeTraits::ArrayType; - using ThisType = SumImpl; + using ThisType = SumImpl; using SumType = typename FindAccumulatorType::Type; using OutputType = typename TypeTraits::ScalarType; @@ -266,11 +273,11 @@ struct SumImpl : public ScalarAggregator { } } - SumState state; + SumState state; }; -template -struct MeanImpl : public SumImpl { +template +struct MeanImpl : public SumImpl { void Finalize(KernelContext*, Datum* out) override { const bool is_valid = this->state.count > 0; const double divisor = static_cast(is_valid ? this->state.count : 1UL); @@ -318,12 +325,12 @@ struct SumLikeInit { // ---------------------------------------------------------------------- // MinMax implementation -template +template struct MinMaxState {}; -template -struct MinMaxState> { - using ThisType = MinMaxState; +template +struct MinMaxState> { + using ThisType = MinMaxState; using T = typename ArrowType::c_type; ThisType& operator+=(const ThisType& rhs) { @@ -345,9 +352,9 @@ struct MinMaxState> { bool has_values = false; }; -template -struct MinMaxState> { - using ThisType = MinMaxState; +template +struct MinMaxState> { + using ThisType = MinMaxState; using T = typename ArrowType::c_type; ThisType& operator+=(const ThisType& rhs) { @@ -369,9 +376,9 @@ struct MinMaxState> { bool has_values = false; }; -template -struct MinMaxState> { - using ThisType = MinMaxState; +template +struct MinMaxState> { + using ThisType = MinMaxState; using T = typename ArrowType::c_type; ThisType& operator+=(const ThisType& rhs) { @@ -393,11 +400,11 @@ struct MinMaxState> { bool has_values = false; }; -template +template struct MinMaxImpl : public ScalarAggregator { using ArrayType = typename TypeTraits::ArrayType; - using ThisType = MinMaxImpl; - using StateType = MinMaxState; + using ThisType = MinMaxImpl; + using StateType = MinMaxState; MinMaxImpl(const std::shared_ptr& out_type, const MinMaxOptions& options) : out_type(out_type), options(options) {} @@ -448,7 +455,7 @@ struct MinMaxImpl : public ScalarAggregator { std::shared_ptr out_type; MinMaxOptions options; - MinMaxState state; + MinMaxState state; private: StateType ConsumeWithNulls(const ArrayType& arr) const { @@ -509,8 +516,12 @@ struct MinMaxImpl : public ScalarAggregator { } }; -struct BooleanMinMaxImpl : public MinMaxImpl { - using MinMaxImpl::MinMaxImpl; +template +struct BooleanMinMaxImpl : public MinMaxImpl { + using StateType = MinMaxState; + using ArrayType = typename TypeTraits::ArrayType; + using MinMaxImpl::MinMaxImpl; + using MinMaxImpl::options; void Consume(KernelContext*, const ExecBatch& batch) override { StateType local; @@ -536,6 +547,7 @@ struct BooleanMinMaxImpl : public MinMaxImpl { } }; +template struct MinMaxInitState { std::unique_ptr state; KernelContext* ctx; @@ -556,13 +568,13 @@ struct MinMaxInitState { } Status Visit(const BooleanType&) { - state.reset(new BooleanMinMaxImpl(out_type, options)); + state.reset(new BooleanMinMaxImpl(out_type, options)); return Status::OK(); } template enable_if_number Visit(const Type&) { - state.reset(new MinMaxImpl(out_type, options)); + state.reset(new MinMaxImpl(out_type, options)); return Status::OK(); } From 09cba634647d7088f7196e061c7e895e3bcae64b Mon Sep 17 00:00:00 2001 From: Frank Du Date: Tue, 1 Sep 2020 01:54:03 +0000 Subject: [PATCH 4/8] Add ValidateMinMaxIsNull for RandomNullArrayMinMax Signed-off-by: Frank Du --- .../arrow/compute/kernels/aggregate_test.cc | 23 ++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/cpp/src/arrow/compute/kernels/aggregate_test.cc b/cpp/src/arrow/compute/kernels/aggregate_test.cc index afa94c84d93..781e2853727 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_test.cc +++ b/cpp/src/arrow/compute/kernels/aggregate_test.cc @@ -683,6 +683,14 @@ void ValidateMinMax(const Array& array) { ASSERT_EQ(expected.second, out_max.value); } +void ValidateMinMaxIsNull(const Array& array) { + ASSERT_OK_AND_ASSIGN(Datum out, MinMax(array)); + const StructScalar& value = out.scalar_as(); + for (const auto& val : value.value) { + ASSERT_FALSE(val->is_valid); + } +} + template class TestRandomNumericMinMaxKernel : public ::testing::Test {}; @@ -691,7 +699,7 @@ TYPED_TEST(TestRandomNumericMinMaxKernel, RandomArrayMinMax) { auto rand = random::RandomArrayGenerator(0x8afc055); // Test size up to 1<<13 (8192). for (size_t i = 3; i < 14; i += 2) { - for (auto null_probability : {0.0, 0.001, 0.1, 0.5, 0.999, 1.0}) { + for (auto null_probability : {0.0, 0.001, 0.1, 0.5, 0.999}) { for (auto length_adjust : {-2, -1, 0, 1, 2}) { int64_t length = (1UL << i) + length_adjust; auto array = rand.Numeric(length, 0, 100, null_probability); @@ -701,6 +709,19 @@ TYPED_TEST(TestRandomNumericMinMaxKernel, RandomArrayMinMax) { } } +TYPED_TEST_SUITE(TestRandomNumericMinMaxKernel, NumericArrowTypes); +TYPED_TEST(TestRandomNumericMinMaxKernel, RandomNullArrayMinMax) { + auto rand = random::RandomArrayGenerator(0x8afc055); + // Test size up to 1<<10 (1024). + for (size_t i = 3; i < 11; i += 2) { + for (auto length_adjust : {-2, -1, 0, 1, 2}) { + int64_t length = (1UL << i) + length_adjust; + auto array = rand.Numeric(length, 0, 100, 1.0); + ValidateMinMaxIsNull(*array); + } + } +} + // // Mode // From c70342b17d6b57a3371050a193cfb8276f88b7b0 Mon Sep 17 00:00:00 2001 From: Frank Du Date: Wed, 2 Sep 2020 03:18:18 +0000 Subject: [PATCH 5/8] Revert "Add ValidateMinMaxIsNull for RandomNullArrayMinMax" This reverts commit 8b5b1a6aa491e76599c1988baf9c5df5a970e672. --- .../arrow/compute/kernels/aggregate_test.cc | 23 +------------------ 1 file changed, 1 insertion(+), 22 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/aggregate_test.cc b/cpp/src/arrow/compute/kernels/aggregate_test.cc index 781e2853727..afa94c84d93 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_test.cc +++ b/cpp/src/arrow/compute/kernels/aggregate_test.cc @@ -683,14 +683,6 @@ void ValidateMinMax(const Array& array) { ASSERT_EQ(expected.second, out_max.value); } -void ValidateMinMaxIsNull(const Array& array) { - ASSERT_OK_AND_ASSIGN(Datum out, MinMax(array)); - const StructScalar& value = out.scalar_as(); - for (const auto& val : value.value) { - ASSERT_FALSE(val->is_valid); - } -} - template class TestRandomNumericMinMaxKernel : public ::testing::Test {}; @@ -699,7 +691,7 @@ TYPED_TEST(TestRandomNumericMinMaxKernel, RandomArrayMinMax) { auto rand = random::RandomArrayGenerator(0x8afc055); // Test size up to 1<<13 (8192). for (size_t i = 3; i < 14; i += 2) { - for (auto null_probability : {0.0, 0.001, 0.1, 0.5, 0.999}) { + for (auto null_probability : {0.0, 0.001, 0.1, 0.5, 0.999, 1.0}) { for (auto length_adjust : {-2, -1, 0, 1, 2}) { int64_t length = (1UL << i) + length_adjust; auto array = rand.Numeric(length, 0, 100, null_probability); @@ -709,19 +701,6 @@ TYPED_TEST(TestRandomNumericMinMaxKernel, RandomArrayMinMax) { } } -TYPED_TEST_SUITE(TestRandomNumericMinMaxKernel, NumericArrowTypes); -TYPED_TEST(TestRandomNumericMinMaxKernel, RandomNullArrayMinMax) { - auto rand = random::RandomArrayGenerator(0x8afc055); - // Test size up to 1<<10 (1024). - for (size_t i = 3; i < 11; i += 2) { - for (auto length_adjust : {-2, -1, 0, 1, 2}) { - int64_t length = (1UL << i) + length_adjust; - auto array = rand.Numeric(length, 0, 100, 1.0); - ValidateMinMaxIsNull(*array); - } - } -} - // // Mode // From 9c68794f98e959e48fc1a38a005a1659acc38814 Mon Sep 17 00:00:00 2001 From: Frank Du Date: Wed, 2 Sep 2020 06:41:35 +0000 Subject: [PATCH 6/8] Add is_valid for the null data test Signed-off-by: Frank Du --- .../arrow/compute/kernels/aggregate_test.cc | 39 +++++++++++++++---- 1 file changed, 31 insertions(+), 8 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/aggregate_test.cc b/cpp/src/arrow/compute/kernels/aggregate_test.cc index afa94c84d93..19a730ea519 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_test.cc +++ b/cpp/src/arrow/compute/kernels/aggregate_test.cc @@ -595,7 +595,13 @@ TYPED_TEST(TestFloatingMinMaxKernel, DefaultOptions) { } template -using MinMaxResult = std::pair; +struct MinMaxResult { + using T = typename ArrowType::c_type; + + T min = 0; + T max = 0; + bool is_valid = false; +}; template static enable_if_integer> NaiveMinMax( @@ -603,11 +609,13 @@ static enable_if_integer> NaiveMinMax( using T = typename ArrowType::c_type; using ArrayType = typename TypeTraits::ArrayType; + MinMaxResult result; + const auto& array_numeric = reinterpret_cast(array); const auto values = array_numeric.raw_values(); if (array.length() <= array.null_count()) { // All null values - return {static_cast(0), static_cast(0)}; + return result; } T min = std::numeric_limits::max(); @@ -629,7 +637,10 @@ static enable_if_integer> NaiveMinMax( } } - return {min, max}; + result.min = min; + result.max = max; + result.is_valid = true; + return result; } template @@ -638,11 +649,13 @@ static enable_if_floating_point> NaiveMinMax( using T = typename ArrowType::c_type; using ArrayType = typename TypeTraits::ArrayType; + MinMaxResult result; + const auto& array_numeric = reinterpret_cast(array); const auto values = array_numeric.raw_values(); if (array.length() <= array.null_count()) { // All null values - return {static_cast(0), static_cast(0)}; + return result; } T min = std::numeric_limits::infinity(); @@ -664,7 +677,10 @@ static enable_if_floating_point> NaiveMinMax( } } - return {min, max}; + result.min = min; + result.max = max; + result.is_valid = true; + return result; } template @@ -677,10 +693,17 @@ void ValidateMinMax(const Array& array) { auto expected = NaiveMinMax(array); const auto& out_min = checked_cast(*value.value[0]); - ASSERT_EQ(expected.first, out_min.value); - const auto& out_max = checked_cast(*value.value[1]); - ASSERT_EQ(expected.second, out_max.value); + + if (expected.is_valid) { // All null values + ASSERT_TRUE(out_min.is_valid); + ASSERT_TRUE(out_max.is_valid); + } else { + ASSERT_FALSE(out_min.is_valid); + ASSERT_FALSE(out_max.is_valid); + } + ASSERT_EQ(expected.min, out_min.value); + ASSERT_EQ(expected.max, out_max.value); } template From b3f05dd915aee77d68fa79e785aac57cb255f63f Mon Sep 17 00:00:00 2001 From: Frank Du Date: Wed, 2 Sep 2020 07:55:37 +0000 Subject: [PATCH 7/8] Correct a comment Signed-off-by: Frank Du --- cpp/src/arrow/compute/kernels/aggregate_test.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/aggregate_test.cc b/cpp/src/arrow/compute/kernels/aggregate_test.cc index 19a730ea519..d23a56dffc5 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_test.cc +++ b/cpp/src/arrow/compute/kernels/aggregate_test.cc @@ -695,10 +695,10 @@ void ValidateMinMax(const Array& array) { const auto& out_min = checked_cast(*value.value[0]); const auto& out_max = checked_cast(*value.value[1]); - if (expected.is_valid) { // All null values + if (expected.is_valid) { ASSERT_TRUE(out_min.is_valid); ASSERT_TRUE(out_max.is_valid); - } else { + } else { // All null values ASSERT_FALSE(out_min.is_valid); ASSERT_FALSE(out_max.is_valid); } From ab6f2beaf2a7d48f66a32f759c51503578b14eab Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Wed, 2 Sep 2020 13:20:43 +0200 Subject: [PATCH 8/8] Some nits --- cpp/src/arrow/compute/kernels/aggregate_test.cc | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/aggregate_test.cc b/cpp/src/arrow/compute/kernels/aggregate_test.cc index d23a56dffc5..d5d46fe8843 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_test.cc +++ b/cpp/src/arrow/compute/kernels/aggregate_test.cc @@ -698,12 +698,12 @@ void ValidateMinMax(const Array& array) { if (expected.is_valid) { ASSERT_TRUE(out_min.is_valid); ASSERT_TRUE(out_max.is_valid); + ASSERT_EQ(expected.min, out_min.value); + ASSERT_EQ(expected.max, out_max.value); } else { // All null values ASSERT_FALSE(out_min.is_valid); ASSERT_FALSE(out_max.is_valid); } - ASSERT_EQ(expected.min, out_min.value); - ASSERT_EQ(expected.max, out_max.value); } template @@ -712,13 +712,14 @@ class TestRandomNumericMinMaxKernel : public ::testing::Test {}; TYPED_TEST_SUITE(TestRandomNumericMinMaxKernel, NumericArrowTypes); TYPED_TEST(TestRandomNumericMinMaxKernel, RandomArrayMinMax) { auto rand = random::RandomArrayGenerator(0x8afc055); - // Test size up to 1<<13 (8192). - for (size_t i = 3; i < 14; i += 2) { - for (auto null_probability : {0.0, 0.001, 0.1, 0.5, 0.999, 1.0}) { + // Test size up to 1<<11 (2048). + for (size_t i = 3; i < 12; i += 2) { + for (auto null_probability : {0.0, 0.01, 0.1, 0.5, 0.99, 1.0}) { + int64_t base_length = (1UL << i) + 2; + auto array = rand.Numeric(base_length, 0, 100, null_probability); for (auto length_adjust : {-2, -1, 0, 1, 2}) { int64_t length = (1UL << i) + length_adjust; - auto array = rand.Numeric(length, 0, 100, null_probability); - ValidateMinMax(*array); + ValidateMinMax(*array->Slice(0, length)); } } }