Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions cpp/src/arrow/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -387,17 +387,17 @@ if(ARROW_COMPUTE)
compute/kernels/vector_sort.cc)

if(CXX_SUPPORTS_AVX2)
list(APPEND ARROW_SRCS compute/kernels/aggregate_sum_avx2.cc)
set_source_files_properties(compute/kernels/aggregate_sum_avx2.cc PROPERTIES
list(APPEND ARROW_SRCS compute/kernels/aggregate_basic_avx2.cc)
set_source_files_properties(compute/kernels/aggregate_basic_avx2.cc PROPERTIES
SKIP_PRECOMPILE_HEADERS ON)
set_source_files_properties(compute/kernels/aggregate_sum_avx2.cc PROPERTIES
set_source_files_properties(compute/kernels/aggregate_basic_avx2.cc PROPERTIES
COMPILE_FLAGS ${ARROW_AVX2_FLAG})
endif()
if(CXX_SUPPORTS_AVX512)
list(APPEND ARROW_SRCS compute/kernels/aggregate_sum_avx512.cc)
set_source_files_properties(compute/kernels/aggregate_sum_avx512.cc PROPERTIES
list(APPEND ARROW_SRCS compute/kernels/aggregate_basic_avx512.cc)
set_source_files_properties(compute/kernels/aggregate_basic_avx512.cc PROPERTIES
SKIP_PRECOMPILE_HEADERS ON)
set_source_files_properties(compute/kernels/aggregate_sum_avx512.cc PROPERTIES
set_source_files_properties(compute/kernels/aggregate_basic_avx512.cc PROPERTIES
COMPILE_FLAGS ${ARROW_AVX512_FLAG})
endif()
endif()
Expand Down
17 changes: 0 additions & 17 deletions cpp/src/arrow/compute/api_aggregate.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,23 +130,6 @@ Result<Datum> MinMax(const Datum& value,
const MinMaxOptions& options = MinMaxOptions::Defaults(),
ExecContext* ctx = NULLPTR);

/// \brief Calculate the min / max of a numeric array.
///
/// This function returns both the min and max as a collection. The resulting
/// datum thus consists of two scalar datums: {Datum(min), Datum(max)}
///
/// \param[in] array input array
/// \param[in] options see MinMaxOptions for more information
/// \param[in] ctx the function execution context, optional
/// \return resulting datum containing a {min, max} collection
///
/// \since 1.0.0
/// \note API not yet finalized
ARROW_EXPORT
Result<Datum> MinMax(const Array& array,
const MinMaxOptions& options = MinMaxOptions::Defaults(),
ExecContext* ctx = NULLPTR);

/// \brief Calculate the modal (most common) value of a numeric array
///
/// This function returns both mode and count as a struct scalar, with type
Expand Down
225 changes: 16 additions & 209 deletions cpp/src/arrow/compute/kernels/aggregate_basic.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
// specific language governing permissions and limitations
// under the License.

#include <cmath>

#include "arrow/compute/api_aggregate.h"
#include "arrow/compute/kernels/aggregate_basic_internal.h"
#include "arrow/compute/kernels/aggregate_internal.h"
Expand Down Expand Up @@ -132,212 +130,10 @@ std::unique_ptr<KernelState> MeanInit(KernelContext* ctx, const KernelInitArgs&
// ----------------------------------------------------------------------
// MinMax implementation

template <typename ArrowType, typename Enable = void>
struct MinMaxState {};

template <typename ArrowType>
struct MinMaxState<ArrowType, enable_if_boolean<ArrowType>> {
using ThisType = MinMaxState<ArrowType>;
using T = typename ArrowType::c_type;

ThisType& operator+=(const ThisType& rhs) {
this->has_nulls |= rhs.has_nulls;
this->has_values |= rhs.has_values;
this->min = this->min && rhs.min;
this->max = this->max || rhs.max;
return *this;
}

void MergeOne(T value) {
this->min = this->min && value;
this->max = this->max || value;
}

T min = true;
T max = false;
bool has_nulls = false;
bool has_values = false;
};

template <typename ArrowType>
struct MinMaxState<ArrowType, enable_if_integer<ArrowType>> {
using ThisType = MinMaxState<ArrowType>;
using T = typename ArrowType::c_type;

ThisType& operator+=(const ThisType& rhs) {
this->has_nulls |= rhs.has_nulls;
this->has_values |= rhs.has_values;
this->min = std::min(this->min, rhs.min);
this->max = std::max(this->max, rhs.max);
return *this;
}

void MergeOne(T value) {
this->min = std::min(this->min, value);
this->max = std::max(this->max, value);
}

T min = std::numeric_limits<T>::max();
T max = std::numeric_limits<T>::min();
bool has_nulls = false;
bool has_values = false;
};

template <typename ArrowType>
struct MinMaxState<ArrowType, enable_if_floating_point<ArrowType>> {
using ThisType = MinMaxState<ArrowType>;
using T = typename ArrowType::c_type;

ThisType& operator+=(const ThisType& rhs) {
this->has_nulls |= rhs.has_nulls;
this->has_values |= rhs.has_values;
this->min = std::fmin(this->min, rhs.min);
this->max = std::fmax(this->max, rhs.max);
return *this;
}

void MergeOne(T value) {
this->min = std::fmin(this->min, value);
this->max = std::fmax(this->max, value);
}

T min = std::numeric_limits<T>::infinity();
T max = -std::numeric_limits<T>::infinity();
bool has_nulls = false;
bool has_values = false;
};

template <typename ArrowType>
struct MinMaxImpl : public ScalarAggregator {
using ArrayType = typename TypeTraits<ArrowType>::ArrayType;
using ThisType = MinMaxImpl<ArrowType>;
using StateType = MinMaxState<ArrowType>;

MinMaxImpl(const std::shared_ptr<DataType>& out_type, const MinMaxOptions& options)
: out_type(out_type), options(options) {}

void Consume(KernelContext*, const ExecBatch& batch) override {
StateType local;

ArrayType arr(batch[0].array());

const auto null_count = arr.null_count();
local.has_nulls = null_count > 0;
local.has_values = (arr.length() - null_count) > 0;

if (local.has_nulls && options.null_handling == MinMaxOptions::OUTPUT_NULL) {
this->state = local;
return;
}

if (local.has_nulls) {
BitmapReader reader(arr.null_bitmap_data(), arr.offset(), arr.length());
for (int64_t i = 0; i < arr.length(); i++) {
if (reader.IsSet()) {
local.MergeOne(arr.Value(i));
}
reader.Next();
}
} else {
for (int64_t i = 0; i < arr.length(); i++) {
local.MergeOne(arr.Value(i));
}
}
this->state = local;
}

void MergeFrom(KernelContext*, const KernelState& src) override {
const auto& other = checked_cast<const ThisType&>(src);
this->state += other.state;
}

void Finalize(KernelContext*, Datum* out) override {
using ScalarType = typename TypeTraits<ArrowType>::ScalarType;

std::vector<std::shared_ptr<Scalar>> values;
if (!state.has_values ||
(state.has_nulls && options.null_handling == MinMaxOptions::OUTPUT_NULL)) {
// (null, null)
values = {std::make_shared<ScalarType>(), std::make_shared<ScalarType>()};
} else {
values = {std::make_shared<ScalarType>(state.min),
std::make_shared<ScalarType>(state.max)};
}
out->value = std::make_shared<StructScalar>(std::move(values), this->out_type);
}

std::shared_ptr<DataType> out_type;
MinMaxOptions options;
MinMaxState<ArrowType> state;
};

struct BooleanMinMaxImpl : public MinMaxImpl<BooleanType> {
using MinMaxImpl::MinMaxImpl;

void Consume(KernelContext*, const ExecBatch& batch) override {
StateType local;
ArrayType arr(batch[0].array());

const auto arr_length = arr.length();
const auto null_count = arr.null_count();
const auto valid_count = arr_length - null_count;

local.has_nulls = null_count > 0;
local.has_values = valid_count > 0;
if (local.has_nulls && options.null_handling == MinMaxOptions::OUTPUT_NULL) {
this->state = local;
return;
}

const auto true_count = arr.true_count();
const auto false_count = valid_count - true_count;
local.max = true_count > 0;
local.min = false_count == 0;

this->state = local;
}
};

struct MinMaxInitState {
std::unique_ptr<KernelState> state;
KernelContext* ctx;
const DataType& in_type;
const std::shared_ptr<DataType>& out_type;
const MinMaxOptions& options;

MinMaxInitState(KernelContext* ctx, const DataType& in_type,
const std::shared_ptr<DataType>& out_type, const MinMaxOptions& options)
: ctx(ctx), in_type(in_type), out_type(out_type), options(options) {}

Status Visit(const DataType&) {
return Status::NotImplemented("No min/max implemented");
}

Status Visit(const HalfFloatType&) {
return Status::NotImplemented("No sum implemented");
}

Status Visit(const BooleanType&) {
state.reset(new BooleanMinMaxImpl(out_type, options));
return Status::OK();
}

template <typename Type>
enable_if_number<Type, Status> Visit(const Type&) {
state.reset(new MinMaxImpl<Type>(out_type, options));
return Status::OK();
}

std::unique_ptr<KernelState> Create() {
ctx->SetStatus(VisitTypeInline(in_type, this));
return std::move(state);
}
};

std::unique_ptr<KernelState> MinMaxInit(KernelContext* ctx, const KernelInitArgs& args) {
MinMaxInitState visitor(ctx, *args.inputs[0].type,
args.kernel->signature->out_type().type(),
static_cast<const MinMaxOptions&>(*args.options));
MinMaxInitState<SimdLevel::NONE> visitor(
ctx, *args.inputs[0].type, args.kernel->signature->out_type().type(),
static_cast<const MinMaxOptions&>(*args.options));
return visitor.Create();
}

Expand All @@ -363,8 +159,7 @@ void AddBasicAggKernels(KernelInit init,

void AddMinMaxKernels(KernelInit init,
const std::vector<std::shared_ptr<DataType>>& types,
ScalarAggregateFunction* func,
SimdLevel::type simd_level = SimdLevel::NONE) {
ScalarAggregateFunction* func, SimdLevel::type simd_level) {
for (const auto& ty : types) {
// array[T] -> scalar[struct<min: T, max: T>]
auto out_ty = struct_({field("min", ty), field("max", ty)});
Expand Down Expand Up @@ -431,6 +226,18 @@ void RegisterScalarAggregateBasic(FunctionRegistry* registry) {
&default_minmax_options);
aggregate::AddMinMaxKernels(aggregate::MinMaxInit, {boolean()}, func.get());
aggregate::AddMinMaxKernels(aggregate::MinMaxInit, NumericTypes(), func.get());
// Add the SIMD variants for min max
#if defined(ARROW_HAVE_RUNTIME_AVX2)
if (cpu_info->IsSupported(arrow::internal::CpuInfo::AVX2)) {
aggregate::AddMinMaxAvx2AggKernels(func.get());
}
#endif
#if defined(ARROW_HAVE_RUNTIME_AVX512)
if (cpu_info->IsSupported(arrow::internal::CpuInfo::AVX512)) {
aggregate::AddMinMaxAvx512AggKernels(func.get());
}
#endif

DCHECK_OK(registry->AddFunction(std::move(func)));

DCHECK_OK(registry->AddFunction(aggregate::AddModeAggKernels()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,17 @@ std::unique_ptr<KernelState> MeanInitAvx2(KernelContext* ctx,
return visitor.Create();
}

// ----------------------------------------------------------------------
// MinMax implementation

std::unique_ptr<KernelState> MinMaxInitAvx2(KernelContext* ctx,
const KernelInitArgs& args) {
MinMaxInitState<SimdLevel::AVX2> visitor(
ctx, *args.inputs[0].type, args.kernel->signature->out_type().type(),
static_cast<const MinMaxOptions&>(*args.options));
return visitor.Create();
}

void AddSumAvx2AggKernels(ScalarAggregateFunction* func) {
AddBasicAggKernels(SumInitAvx2, internal::SignedIntTypes(), int64(), func,
SimdLevel::AVX2);
Expand All @@ -81,6 +92,12 @@ void AddMeanAvx2AggKernels(ScalarAggregateFunction* func) {
SimdLevel::AVX2);
}

void AddMinMaxAvx2AggKernels(ScalarAggregateFunction* func) {
// Enable int types for AVX2 variants.
// No auto vectorize for float/double as it use fmin/fmax which has NaN handling.
AddMinMaxKernels(MinMaxInitAvx2, internal::IntTypes(), func, SimdLevel::AVX2);
}

} // namespace aggregate
} // namespace compute
} // namespace arrow
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,17 @@ std::unique_ptr<KernelState> MeanInitAvx512(KernelContext* ctx,
return visitor.Create();
}

// ----------------------------------------------------------------------
// MinMax implementation

std::unique_ptr<KernelState> MinMaxInitAvx512(KernelContext* ctx,
const KernelInitArgs& args) {
MinMaxInitState<SimdLevel::AVX512> visitor(
ctx, *args.inputs[0].type, args.kernel->signature->out_type().type(),
static_cast<const MinMaxOptions&>(*args.options));
return visitor.Create();
}

void AddSumAvx512AggKernels(ScalarAggregateFunction* func) {
AddBasicAggKernels(SumInitAvx512, internal::SignedIntTypes(), int64(), func,
SimdLevel::AVX512);
Expand All @@ -82,6 +93,12 @@ void AddMeanAvx512AggKernels(ScalarAggregateFunction* func) {
SimdLevel::AVX512);
}

void AddMinMaxAvx512AggKernels(ScalarAggregateFunction* func) {
// Enable 32/64 int types for avx512 variants, no advantage on 8/16 int.
AddMinMaxKernels(MinMaxInitAvx512, {int32(), uint32(), int64(), uint64()}, func,
SimdLevel::AVX512);
}

} // namespace aggregate
} // namespace compute
} // namespace arrow
Loading