diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index fdc0ed0e558..26e16b56c22 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -387,17 +387,17 @@ if(ARROW_COMPUTE) compute/kernels/vector_sort.cc) if(CXX_SUPPORTS_AVX2) - list(APPEND ARROW_SRCS compute/kernels/aggregate_sum_avx2.cc) - set_source_files_properties(compute/kernels/aggregate_sum_avx2.cc PROPERTIES + list(APPEND ARROW_SRCS compute/kernels/aggregate_basic_avx2.cc) + set_source_files_properties(compute/kernels/aggregate_basic_avx2.cc PROPERTIES SKIP_PRECOMPILE_HEADERS ON) - set_source_files_properties(compute/kernels/aggregate_sum_avx2.cc PROPERTIES + set_source_files_properties(compute/kernels/aggregate_basic_avx2.cc PROPERTIES COMPILE_FLAGS ${ARROW_AVX2_FLAG}) endif() if(CXX_SUPPORTS_AVX512) - list(APPEND ARROW_SRCS compute/kernels/aggregate_sum_avx512.cc) - set_source_files_properties(compute/kernels/aggregate_sum_avx512.cc PROPERTIES + list(APPEND ARROW_SRCS compute/kernels/aggregate_basic_avx512.cc) + set_source_files_properties(compute/kernels/aggregate_basic_avx512.cc PROPERTIES SKIP_PRECOMPILE_HEADERS ON) - set_source_files_properties(compute/kernels/aggregate_sum_avx512.cc PROPERTIES + set_source_files_properties(compute/kernels/aggregate_basic_avx512.cc PROPERTIES COMPILE_FLAGS ${ARROW_AVX512_FLAG}) endif() endif() diff --git a/cpp/src/arrow/compute/api_aggregate.h b/cpp/src/arrow/compute/api_aggregate.h index 5ae3cf9b5fe..675bd25a684 100644 --- a/cpp/src/arrow/compute/api_aggregate.h +++ b/cpp/src/arrow/compute/api_aggregate.h @@ -130,23 +130,6 @@ Result MinMax(const Datum& value, const MinMaxOptions& options = MinMaxOptions::Defaults(), ExecContext* ctx = NULLPTR); -/// \brief Calculate the min / max of a numeric array. -/// -/// This function returns both the min and max as a collection. The resulting -/// datum thus consists of two scalar datums: {Datum(min), Datum(max)} -/// -/// \param[in] array input array -/// \param[in] options see MinMaxOptions for more information -/// \param[in] ctx the function execution context, optional -/// \return resulting datum containing a {min, max} collection -/// -/// \since 1.0.0 -/// \note API not yet finalized -ARROW_EXPORT -Result MinMax(const Array& array, - const MinMaxOptions& options = MinMaxOptions::Defaults(), - ExecContext* ctx = NULLPTR); - /// \brief Calculate the modal (most common) value of a numeric array /// /// This function returns both mode and count as a struct scalar, with type diff --git a/cpp/src/arrow/compute/kernels/aggregate_basic.cc b/cpp/src/arrow/compute/kernels/aggregate_basic.cc index e2cc2a334d8..33afd68bc44 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_basic.cc +++ b/cpp/src/arrow/compute/kernels/aggregate_basic.cc @@ -15,8 +15,6 @@ // specific language governing permissions and limitations // under the License. -#include - #include "arrow/compute/api_aggregate.h" #include "arrow/compute/kernels/aggregate_basic_internal.h" #include "arrow/compute/kernels/aggregate_internal.h" @@ -132,212 +130,10 @@ std::unique_ptr MeanInit(KernelContext* ctx, const KernelInitArgs& // ---------------------------------------------------------------------- // MinMax implementation -template -struct MinMaxState {}; - -template -struct MinMaxState> { - using ThisType = MinMaxState; - using T = typename ArrowType::c_type; - - ThisType& operator+=(const ThisType& rhs) { - this->has_nulls |= rhs.has_nulls; - this->has_values |= rhs.has_values; - this->min = this->min && rhs.min; - this->max = this->max || rhs.max; - return *this; - } - - void MergeOne(T value) { - this->min = this->min && value; - this->max = this->max || value; - } - - T min = true; - T max = false; - bool has_nulls = false; - bool has_values = false; -}; - -template -struct MinMaxState> { - using ThisType = MinMaxState; - using T = typename ArrowType::c_type; - - ThisType& operator+=(const ThisType& rhs) { - this->has_nulls |= rhs.has_nulls; - this->has_values |= rhs.has_values; - this->min = std::min(this->min, rhs.min); - this->max = std::max(this->max, rhs.max); - return *this; - } - - void MergeOne(T value) { - this->min = std::min(this->min, value); - this->max = std::max(this->max, value); - } - - T min = std::numeric_limits::max(); - T max = std::numeric_limits::min(); - bool has_nulls = false; - bool has_values = false; -}; - -template -struct MinMaxState> { - using ThisType = MinMaxState; - using T = typename ArrowType::c_type; - - ThisType& operator+=(const ThisType& rhs) { - this->has_nulls |= rhs.has_nulls; - this->has_values |= rhs.has_values; - this->min = std::fmin(this->min, rhs.min); - this->max = std::fmax(this->max, rhs.max); - return *this; - } - - void MergeOne(T value) { - this->min = std::fmin(this->min, value); - this->max = std::fmax(this->max, value); - } - - T min = std::numeric_limits::infinity(); - T max = -std::numeric_limits::infinity(); - bool has_nulls = false; - bool has_values = false; -}; - -template -struct MinMaxImpl : public ScalarAggregator { - using ArrayType = typename TypeTraits::ArrayType; - using ThisType = MinMaxImpl; - using StateType = MinMaxState; - - MinMaxImpl(const std::shared_ptr& out_type, const MinMaxOptions& options) - : out_type(out_type), options(options) {} - - void Consume(KernelContext*, const ExecBatch& batch) override { - StateType local; - - ArrayType arr(batch[0].array()); - - const auto null_count = arr.null_count(); - local.has_nulls = null_count > 0; - local.has_values = (arr.length() - null_count) > 0; - - if (local.has_nulls && options.null_handling == MinMaxOptions::OUTPUT_NULL) { - this->state = local; - return; - } - - if (local.has_nulls) { - BitmapReader reader(arr.null_bitmap_data(), arr.offset(), arr.length()); - for (int64_t i = 0; i < arr.length(); i++) { - if (reader.IsSet()) { - local.MergeOne(arr.Value(i)); - } - reader.Next(); - } - } else { - for (int64_t i = 0; i < arr.length(); i++) { - local.MergeOne(arr.Value(i)); - } - } - this->state = local; - } - - void MergeFrom(KernelContext*, const KernelState& src) override { - const auto& other = checked_cast(src); - this->state += other.state; - } - - void Finalize(KernelContext*, Datum* out) override { - using ScalarType = typename TypeTraits::ScalarType; - - std::vector> values; - if (!state.has_values || - (state.has_nulls && options.null_handling == MinMaxOptions::OUTPUT_NULL)) { - // (null, null) - values = {std::make_shared(), std::make_shared()}; - } else { - values = {std::make_shared(state.min), - std::make_shared(state.max)}; - } - out->value = std::make_shared(std::move(values), this->out_type); - } - - std::shared_ptr out_type; - MinMaxOptions options; - MinMaxState state; -}; - -struct BooleanMinMaxImpl : public MinMaxImpl { - using MinMaxImpl::MinMaxImpl; - - void Consume(KernelContext*, const ExecBatch& batch) override { - StateType local; - ArrayType arr(batch[0].array()); - - const auto arr_length = arr.length(); - const auto null_count = arr.null_count(); - const auto valid_count = arr_length - null_count; - - local.has_nulls = null_count > 0; - local.has_values = valid_count > 0; - if (local.has_nulls && options.null_handling == MinMaxOptions::OUTPUT_NULL) { - this->state = local; - return; - } - - const auto true_count = arr.true_count(); - const auto false_count = valid_count - true_count; - local.max = true_count > 0; - local.min = false_count == 0; - - this->state = local; - } -}; - -struct MinMaxInitState { - std::unique_ptr state; - KernelContext* ctx; - const DataType& in_type; - const std::shared_ptr& out_type; - const MinMaxOptions& options; - - MinMaxInitState(KernelContext* ctx, const DataType& in_type, - const std::shared_ptr& out_type, const MinMaxOptions& options) - : ctx(ctx), in_type(in_type), out_type(out_type), options(options) {} - - Status Visit(const DataType&) { - return Status::NotImplemented("No min/max implemented"); - } - - Status Visit(const HalfFloatType&) { - return Status::NotImplemented("No sum implemented"); - } - - Status Visit(const BooleanType&) { - state.reset(new BooleanMinMaxImpl(out_type, options)); - return Status::OK(); - } - - template - enable_if_number Visit(const Type&) { - state.reset(new MinMaxImpl(out_type, options)); - return Status::OK(); - } - - std::unique_ptr Create() { - ctx->SetStatus(VisitTypeInline(in_type, this)); - return std::move(state); - } -}; - std::unique_ptr MinMaxInit(KernelContext* ctx, const KernelInitArgs& args) { - MinMaxInitState visitor(ctx, *args.inputs[0].type, - args.kernel->signature->out_type().type(), - static_cast(*args.options)); + MinMaxInitState visitor( + ctx, *args.inputs[0].type, args.kernel->signature->out_type().type(), + static_cast(*args.options)); return visitor.Create(); } @@ -363,8 +159,7 @@ void AddBasicAggKernels(KernelInit init, void AddMinMaxKernels(KernelInit init, const std::vector>& types, - ScalarAggregateFunction* func, - SimdLevel::type simd_level = SimdLevel::NONE) { + ScalarAggregateFunction* func, SimdLevel::type simd_level) { for (const auto& ty : types) { // array[T] -> scalar[struct] auto out_ty = struct_({field("min", ty), field("max", ty)}); @@ -431,6 +226,18 @@ void RegisterScalarAggregateBasic(FunctionRegistry* registry) { &default_minmax_options); aggregate::AddMinMaxKernels(aggregate::MinMaxInit, {boolean()}, func.get()); aggregate::AddMinMaxKernels(aggregate::MinMaxInit, NumericTypes(), func.get()); + // Add the SIMD variants for min max +#if defined(ARROW_HAVE_RUNTIME_AVX2) + if (cpu_info->IsSupported(arrow::internal::CpuInfo::AVX2)) { + aggregate::AddMinMaxAvx2AggKernels(func.get()); + } +#endif +#if defined(ARROW_HAVE_RUNTIME_AVX512) + if (cpu_info->IsSupported(arrow::internal::CpuInfo::AVX512)) { + aggregate::AddMinMaxAvx512AggKernels(func.get()); + } +#endif + DCHECK_OK(registry->AddFunction(std::move(func))); DCHECK_OK(registry->AddFunction(aggregate::AddModeAggKernels())); diff --git a/cpp/src/arrow/compute/kernels/aggregate_sum_avx2.cc b/cpp/src/arrow/compute/kernels/aggregate_basic_avx2.cc similarity index 80% rename from cpp/src/arrow/compute/kernels/aggregate_sum_avx2.cc rename to cpp/src/arrow/compute/kernels/aggregate_basic_avx2.cc index 2811c4cd865..e0c1118c714 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_sum_avx2.cc +++ b/cpp/src/arrow/compute/kernels/aggregate_basic_avx2.cc @@ -67,6 +67,17 @@ std::unique_ptr MeanInitAvx2(KernelContext* ctx, return visitor.Create(); } +// ---------------------------------------------------------------------- +// MinMax implementation + +std::unique_ptr MinMaxInitAvx2(KernelContext* ctx, + const KernelInitArgs& args) { + MinMaxInitState visitor( + ctx, *args.inputs[0].type, args.kernel->signature->out_type().type(), + static_cast(*args.options)); + return visitor.Create(); +} + void AddSumAvx2AggKernels(ScalarAggregateFunction* func) { AddBasicAggKernels(SumInitAvx2, internal::SignedIntTypes(), int64(), func, SimdLevel::AVX2); @@ -81,6 +92,12 @@ void AddMeanAvx2AggKernels(ScalarAggregateFunction* func) { SimdLevel::AVX2); } +void AddMinMaxAvx2AggKernels(ScalarAggregateFunction* func) { + // Enable int types for AVX2 variants. + // No auto vectorize for float/double as it use fmin/fmax which has NaN handling. + AddMinMaxKernels(MinMaxInitAvx2, internal::IntTypes(), func, SimdLevel::AVX2); +} + } // namespace aggregate } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/kernels/aggregate_sum_avx512.cc b/cpp/src/arrow/compute/kernels/aggregate_basic_avx512.cc similarity index 80% rename from cpp/src/arrow/compute/kernels/aggregate_sum_avx512.cc rename to cpp/src/arrow/compute/kernels/aggregate_basic_avx512.cc index 00408027e1f..c2c748d3af7 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_sum_avx512.cc +++ b/cpp/src/arrow/compute/kernels/aggregate_basic_avx512.cc @@ -68,6 +68,17 @@ std::unique_ptr MeanInitAvx512(KernelContext* ctx, return visitor.Create(); } +// ---------------------------------------------------------------------- +// MinMax implementation + +std::unique_ptr MinMaxInitAvx512(KernelContext* ctx, + const KernelInitArgs& args) { + MinMaxInitState visitor( + ctx, *args.inputs[0].type, args.kernel->signature->out_type().type(), + static_cast(*args.options)); + return visitor.Create(); +} + void AddSumAvx512AggKernels(ScalarAggregateFunction* func) { AddBasicAggKernels(SumInitAvx512, internal::SignedIntTypes(), int64(), func, SimdLevel::AVX512); @@ -82,6 +93,12 @@ void AddMeanAvx512AggKernels(ScalarAggregateFunction* func) { SimdLevel::AVX512); } +void AddMinMaxAvx512AggKernels(ScalarAggregateFunction* func) { + // Enable 32/64 int types for avx512 variants, no advantage on 8/16 int. + AddMinMaxKernels(MinMaxInitAvx512, {int32(), uint32(), int64(), uint64()}, func, + SimdLevel::AVX512); +} + } // namespace aggregate } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/kernels/aggregate_basic_internal.h b/cpp/src/arrow/compute/kernels/aggregate_basic_internal.h index 29db97381d6..0d8d43d95d6 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_basic_internal.h +++ b/cpp/src/arrow/compute/kernels/aggregate_basic_internal.h @@ -17,6 +17,9 @@ #pragma once +#include + +#include "arrow/compute/api_aggregate.h" #include "arrow/compute/kernels/aggregate_internal.h" #include "arrow/compute/kernels/common.h" #include "arrow/util/align_util.h" @@ -41,22 +44,29 @@ void AddBasicAggKernels(KernelInit init, std::shared_ptr out_ty, ScalarAggregateFunction* func, SimdLevel::type simd_level = SimdLevel::NONE); +void AddMinMaxKernels(KernelInit init, + const std::vector>& types, + ScalarAggregateFunction* func, + SimdLevel::type simd_level = SimdLevel::NONE); + // SIMD variants for kernels void AddSumAvx2AggKernels(ScalarAggregateFunction* func); void AddMeanAvx2AggKernels(ScalarAggregateFunction* func); +void AddMinMaxAvx2AggKernels(ScalarAggregateFunction* func); void AddSumAvx512AggKernels(ScalarAggregateFunction* func); void AddMeanAvx512AggKernels(ScalarAggregateFunction* func); +void AddMinMaxAvx512AggKernels(ScalarAggregateFunction* func); std::shared_ptr AddModeAggKernels(); // ---------------------------------------------------------------------- // Sum implementation -template +template struct SumState { using SumType = typename FindAccumulatorType::Type; - using ThisType = SumState; + using ThisType = SumState; using T = typename TypeTraits::CType; using ArrayType = typename TypeTraits::ArrayType; @@ -217,10 +227,10 @@ struct SumState { } }; -template -struct SumState { +template +struct SumState { using SumType = typename FindAccumulatorType::Type; - using ThisType = SumState; + using ThisType = SumState; ThisType& operator+=(const ThisType& rhs) { this->count += rhs.count; @@ -239,10 +249,10 @@ struct SumState { typename SumType::c_type sum = 0; }; -template +template struct SumImpl : public ScalarAggregator { using ArrayType = typename TypeTraits::ArrayType; - using ThisType = SumImpl; + using ThisType = SumImpl; using SumType = typename FindAccumulatorType::Type; using OutputType = typename TypeTraits::ScalarType; @@ -263,11 +273,11 @@ struct SumImpl : public ScalarAggregator { } } - SumState state; + SumState state; }; -template -struct MeanImpl : public SumImpl { +template +struct MeanImpl : public SumImpl { void Finalize(KernelContext*, Datum* out) override { const bool is_valid = this->state.count > 0; const double divisor = static_cast(is_valid ? this->state.count : 1UL); @@ -312,6 +322,268 @@ struct SumLikeInit { } }; +// ---------------------------------------------------------------------- +// MinMax implementation + +template +struct MinMaxState {}; + +template +struct MinMaxState> { + using ThisType = MinMaxState; + using T = typename ArrowType::c_type; + + ThisType& operator+=(const ThisType& rhs) { + this->has_nulls |= rhs.has_nulls; + this->has_values |= rhs.has_values; + this->min = this->min && rhs.min; + this->max = this->max || rhs.max; + return *this; + } + + void MergeOne(T value) { + this->min = this->min && value; + this->max = this->max || value; + } + + T min = true; + T max = false; + bool has_nulls = false; + bool has_values = false; +}; + +template +struct MinMaxState> { + using ThisType = MinMaxState; + using T = typename ArrowType::c_type; + + ThisType& operator+=(const ThisType& rhs) { + this->has_nulls |= rhs.has_nulls; + this->has_values |= rhs.has_values; + this->min = std::min(this->min, rhs.min); + this->max = std::max(this->max, rhs.max); + return *this; + } + + void MergeOne(T value) { + this->min = std::min(this->min, value); + this->max = std::max(this->max, value); + } + + T min = std::numeric_limits::max(); + T max = std::numeric_limits::min(); + bool has_nulls = false; + bool has_values = false; +}; + +template +struct MinMaxState> { + using ThisType = MinMaxState; + using T = typename ArrowType::c_type; + + ThisType& operator+=(const ThisType& rhs) { + this->has_nulls |= rhs.has_nulls; + this->has_values |= rhs.has_values; + this->min = std::fmin(this->min, rhs.min); + this->max = std::fmax(this->max, rhs.max); + return *this; + } + + void MergeOne(T value) { + this->min = std::fmin(this->min, value); + this->max = std::fmax(this->max, value); + } + + T min = std::numeric_limits::infinity(); + T max = -std::numeric_limits::infinity(); + bool has_nulls = false; + bool has_values = false; +}; + +template +struct MinMaxImpl : public ScalarAggregator { + using ArrayType = typename TypeTraits::ArrayType; + using ThisType = MinMaxImpl; + using StateType = MinMaxState; + + MinMaxImpl(const std::shared_ptr& out_type, const MinMaxOptions& options) + : out_type(out_type), options(options) {} + + void Consume(KernelContext*, const ExecBatch& batch) override { + StateType local; + + ArrayType arr(batch[0].array()); + + const auto null_count = arr.null_count(); + local.has_nulls = null_count > 0; + local.has_values = (arr.length() - null_count) > 0; + + if (local.has_nulls && options.null_handling == MinMaxOptions::OUTPUT_NULL) { + this->state = local; + return; + } + + if (local.has_nulls) { + local += ConsumeWithNulls(arr); + } else { // All true values + for (int64_t i = 0; i < arr.length(); i++) { + local.MergeOne(arr.Value(i)); + } + } + this->state = local; + } + + void MergeFrom(KernelContext*, const KernelState& src) override { + const auto& other = checked_cast(src); + this->state += other.state; + } + + void Finalize(KernelContext*, Datum* out) override { + using ScalarType = typename TypeTraits::ScalarType; + + std::vector> values; + if (!state.has_values || + (state.has_nulls && options.null_handling == MinMaxOptions::OUTPUT_NULL)) { + // (null, null) + values = {std::make_shared(), std::make_shared()}; + } else { + values = {std::make_shared(state.min), + std::make_shared(state.max)}; + } + out->value = std::make_shared(std::move(values), this->out_type); + } + + std::shared_ptr out_type; + MinMaxOptions options; + MinMaxState state; + + private: + StateType ConsumeWithNulls(const ArrayType& arr) const { + StateType local; + const int64_t length = arr.length(); + int64_t offset = arr.offset(); + const uint8_t* bitmap = arr.null_bitmap_data(); + int64_t idx = 0; + + const auto p = arrow::internal::BitmapWordAlign<1>(bitmap, offset, length); + // First handle the leading bits + const int64_t leading_bits = p.leading_bits; + while (idx < leading_bits) { + if (BitUtil::GetBit(bitmap, offset)) { + local.MergeOne(arr.Value(idx)); + } + idx++; + offset++; + } + + // The aligned parts scanned with BitBlockCounter + arrow::internal::BitBlockCounter data_counter(bitmap, offset, length - leading_bits); + auto current_block = data_counter.NextWord(); + while (idx < length) { + if (current_block.AllSet()) { // All true values + int run_length = 0; + // Scan forward until a block that has some false values (or the end) + while (current_block.length > 0 && current_block.AllSet()) { + run_length += current_block.length; + current_block = data_counter.NextWord(); + } + for (int64_t i = 0; i < run_length; i++) { + local.MergeOne(arr.Value(idx + i)); + } + idx += run_length; + offset += run_length; + // The current_block already computed, advance to next loop + continue; + } else if (!current_block.NoneSet()) { // Some values are null + BitmapReader reader(arr.null_bitmap_data(), offset, current_block.length); + for (int64_t i = 0; i < current_block.length; i++) { + if (reader.IsSet()) { + local.MergeOne(arr.Value(idx + i)); + } + reader.Next(); + } + + idx += current_block.length; + offset += current_block.length; + } else { // All null values + idx += current_block.length; + offset += current_block.length; + } + current_block = data_counter.NextWord(); + } + + return local; + } +}; + +template +struct BooleanMinMaxImpl : public MinMaxImpl { + using StateType = MinMaxState; + using ArrayType = typename TypeTraits::ArrayType; + using MinMaxImpl::MinMaxImpl; + using MinMaxImpl::options; + + void Consume(KernelContext*, const ExecBatch& batch) override { + StateType local; + ArrayType arr(batch[0].array()); + + const auto arr_length = arr.length(); + const auto null_count = arr.null_count(); + const auto valid_count = arr_length - null_count; + + local.has_nulls = null_count > 0; + local.has_values = valid_count > 0; + if (local.has_nulls && options.null_handling == MinMaxOptions::OUTPUT_NULL) { + this->state = local; + return; + } + + const auto true_count = arr.true_count(); + const auto false_count = valid_count - true_count; + local.max = true_count > 0; + local.min = false_count == 0; + + this->state = local; + } +}; + +template +struct MinMaxInitState { + std::unique_ptr state; + KernelContext* ctx; + const DataType& in_type; + const std::shared_ptr& out_type; + const MinMaxOptions& options; + + MinMaxInitState(KernelContext* ctx, const DataType& in_type, + const std::shared_ptr& out_type, const MinMaxOptions& options) + : ctx(ctx), in_type(in_type), out_type(out_type), options(options) {} + + Status Visit(const DataType&) { + return Status::NotImplemented("No min/max implemented"); + } + + Status Visit(const HalfFloatType&) { + return Status::NotImplemented("No min/max implemented"); + } + + Status Visit(const BooleanType&) { + state.reset(new BooleanMinMaxImpl(out_type, options)); + return Status::OK(); + } + + template + enable_if_number Visit(const Type&) { + state.reset(new MinMaxImpl(out_type, options)); + return Status::OK(); + } + + std::unique_ptr Create() { + ctx->SetStatus(VisitTypeInline(in_type, this)); + return std::move(state); + } +}; + } // namespace aggregate } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/kernels/aggregate_test.cc b/cpp/src/arrow/compute/kernels/aggregate_test.cc index 15cfcb42f1d..d5d46fe8843 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_test.cc +++ b/cpp/src/arrow/compute/kernels/aggregate_test.cc @@ -594,6 +594,137 @@ TYPED_TEST(TestFloatingMinMaxKernel, DefaultOptions) { AssertDatumsEqual(explicit_defaults, no_options_provided); } +template +struct MinMaxResult { + using T = typename ArrowType::c_type; + + T min = 0; + T max = 0; + bool is_valid = false; +}; + +template +static enable_if_integer> NaiveMinMax( + const Array& array) { + using T = typename ArrowType::c_type; + using ArrayType = typename TypeTraits::ArrayType; + + MinMaxResult result; + + const auto& array_numeric = reinterpret_cast(array); + const auto values = array_numeric.raw_values(); + + if (array.length() <= array.null_count()) { // All null values + return result; + } + + T min = std::numeric_limits::max(); + T max = std::numeric_limits::min(); + if (array.null_count() != 0) { // Some values are null + internal::BitmapReader reader(array.null_bitmap_data(), array.offset(), + array.length()); + for (int64_t i = 0; i < array.length(); i++) { + if (reader.IsSet()) { + min = std::min(min, values[i]); + max = std::max(max, values[i]); + } + reader.Next(); + } + } else { // All true values + for (int64_t i = 0; i < array.length(); i++) { + min = std::min(min, values[i]); + max = std::max(max, values[i]); + } + } + + result.min = min; + result.max = max; + result.is_valid = true; + return result; +} + +template +static enable_if_floating_point> NaiveMinMax( + const Array& array) { + using T = typename ArrowType::c_type; + using ArrayType = typename TypeTraits::ArrayType; + + MinMaxResult result; + + const auto& array_numeric = reinterpret_cast(array); + const auto values = array_numeric.raw_values(); + + if (array.length() <= array.null_count()) { // All null values + return result; + } + + T min = std::numeric_limits::infinity(); + T max = -std::numeric_limits::infinity(); + if (array.null_count() != 0) { // Some values are null + internal::BitmapReader reader(array.null_bitmap_data(), array.offset(), + array.length()); + for (int64_t i = 0; i < array.length(); i++) { + if (reader.IsSet()) { + min = std::fmin(min, values[i]); + max = std::fmax(max, values[i]); + } + reader.Next(); + } + } else { // All true values + for (int64_t i = 0; i < array.length(); i++) { + min = std::fmin(min, values[i]); + max = std::fmax(max, values[i]); + } + } + + result.min = min; + result.max = max; + result.is_valid = true; + return result; +} + +template +void ValidateMinMax(const Array& array) { + using Traits = TypeTraits; + using ScalarType = typename Traits::ScalarType; + + ASSERT_OK_AND_ASSIGN(Datum out, MinMax(array)); + const StructScalar& value = out.scalar_as(); + + auto expected = NaiveMinMax(array); + const auto& out_min = checked_cast(*value.value[0]); + const auto& out_max = checked_cast(*value.value[1]); + + if (expected.is_valid) { + ASSERT_TRUE(out_min.is_valid); + ASSERT_TRUE(out_max.is_valid); + ASSERT_EQ(expected.min, out_min.value); + ASSERT_EQ(expected.max, out_max.value); + } else { // All null values + ASSERT_FALSE(out_min.is_valid); + ASSERT_FALSE(out_max.is_valid); + } +} + +template +class TestRandomNumericMinMaxKernel : public ::testing::Test {}; + +TYPED_TEST_SUITE(TestRandomNumericMinMaxKernel, NumericArrowTypes); +TYPED_TEST(TestRandomNumericMinMaxKernel, RandomArrayMinMax) { + auto rand = random::RandomArrayGenerator(0x8afc055); + // Test size up to 1<<11 (2048). + for (size_t i = 3; i < 12; i += 2) { + for (auto null_probability : {0.0, 0.01, 0.1, 0.5, 0.99, 1.0}) { + int64_t base_length = (1UL << i) + 2; + auto array = rand.Numeric(base_length, 0, 100, null_probability); + for (auto length_adjust : {-2, -1, 0, 1, 2}) { + int64_t length = (1UL << i) + length_adjust; + ValidateMinMax(*array->Slice(0, length)); + } + } + } +} + // // Mode //