Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/src/arrow/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -783,6 +783,7 @@ if(ARROW_COMPUTE)
compute/kernels/vector_run_end_encode.cc
compute/kernels/vector_select_k.cc
compute/kernels/vector_sort.cc
compute/kernels/vector_statistics.cc
compute/kernels/vector_swizzle.cc
compute/key_hash_internal.cc
compute/key_map_internal.cc
Expand Down
9 changes: 9 additions & 0 deletions cpp/src/arrow/compute/api_vector.cc
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,9 @@ static auto kSortOptionsType = GetFunctionOptionsType<SortOptions>(
static auto kPartitionNthOptionsType = GetFunctionOptionsType<PartitionNthOptions>(
DataMember("pivot", &PartitionNthOptions::pivot),
DataMember("null_placement", &PartitionNthOptions::null_placement));
static auto kWinsorizeOptionsType = GetFunctionOptionsType<WinsorizeOptions>(
DataMember("lower_limit", &WinsorizeOptions::lower_limit),
DataMember("upper_limit", &WinsorizeOptions::upper_limit));
static auto kSelectKOptionsType = GetFunctionOptionsType<SelectKOptions>(
DataMember("k", &SelectKOptions::k),
DataMember("sort_keys", &SelectKOptions::sort_keys));
Expand Down Expand Up @@ -208,6 +211,11 @@ PartitionNthOptions::PartitionNthOptions(int64_t pivot, NullPlacement null_place
null_placement(null_placement) {}
constexpr char PartitionNthOptions::kTypeName[];

WinsorizeOptions::WinsorizeOptions(double lower_limit, double upper_limit)
: FunctionOptions(internal::kWinsorizeOptionsType),
lower_limit(lower_limit),
upper_limit(upper_limit) {}

SelectKOptions::SelectKOptions(int64_t k, std::vector<SortKey> sort_keys)
: FunctionOptions(internal::kSelectKOptionsType),
k(k),
Expand Down Expand Up @@ -275,6 +283,7 @@ void RegisterVectorOptions(FunctionRegistry* registry) {
DCHECK_OK(registry->AddFunctionOptionsType(kListFlattenOptionsType));
DCHECK_OK(registry->AddFunctionOptionsType(kInversePermutationOptionsType));
DCHECK_OK(registry->AddFunctionOptionsType(kScatterOptionsType));
DCHECK_OK(registry->AddFunctionOptionsType(kWinsorizeOptionsType));
}
} // namespace internal

Expand Down
19 changes: 19 additions & 0 deletions cpp/src/arrow/compute/api_vector.h
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,25 @@ class ARROW_EXPORT PartitionNthOptions : public FunctionOptions {
NullPlacement null_placement;
};

class ARROW_EXPORT WinsorizeOptions : public FunctionOptions {
public:
WinsorizeOptions(double lower_limit, double upper_limit);
WinsorizeOptions() : WinsorizeOptions(0, 1) {}
static constexpr char const kTypeName[] = "WinsorizeOptions";

/// The quantile below which all values are replaced with the quantile's value.
///
/// For example, if lower_limit = 0.05, then all values in the lower 5% percentile
/// will be replaced with the 5% percentile value.
double lower_limit;

/// The quantile above which all values are replaced with the quantile's value.
///
/// For example, if upper_limit = 0.95, then all values in the upper 95% percentile
/// will be replaced with the 95% percentile value.
double upper_limit;
};

/// \brief Options for cumulative functions
/// \note Also aliased as CumulativeSumOptions for backward compatibility
class ARROW_EXPORT CumulativeOptions : public FunctionOptions {
Expand Down
1 change: 1 addition & 0 deletions cpp/src/arrow/compute/kernels/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ add_arrow_compute_test(vector_test
vector_nested_test.cc
vector_replace_test.cc
vector_run_end_encode_test.cc
vector_statistics_test.cc
select_k_test.cc
EXTRA_LINK_LIBS
arrow_compute_kernels_testing
Expand Down
7 changes: 3 additions & 4 deletions cpp/src/arrow/compute/kernels/aggregate_mode.cc
Original file line number Diff line number Diff line change
Expand Up @@ -495,10 +495,9 @@ void RegisterScalarAggregateMode(FunctionRegistry* registry) {
ModeExecutorChunked<StructType, BooleanType>::Exec)));
for (const auto& type : NumericTypes()) {
// TODO(wesm):
DCHECK_OK(func->AddKernel(NewModeKernel(
type, GenerateNumeric<ModeExecutor, StructType>(*type),
GenerateNumeric<ModeExecutorChunked, StructType, VectorKernel::ChunkedExec>(
*type))));
DCHECK_OK(func->AddKernel(
NewModeKernel(type, GenerateNumeric<ModeExecutor, StructType>(*type),
GenerateNumeric<ModeExecutorChunked, StructType>(*type))));
}
// Type parameters are ignored
DCHECK_OK(func->AddKernel(
Expand Down
26 changes: 11 additions & 15 deletions cpp/src/arrow/compute/kernels/aggregate_quantile.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,12 @@ template <typename T>
double DataPointToDouble(T value, const DataType&) {
return static_cast<double>(value);
}
double DataPointToDouble(const Decimal32& value, const DataType& ty) {
return value.ToDouble(checked_cast<const DecimalType&>(ty).scale());
}
double DataPointToDouble(const Decimal64& value, const DataType& ty) {
return value.ToDouble(checked_cast<const DecimalType&>(ty).scale());
}
double DataPointToDouble(const Decimal128& value, const DataType& ty) {
return value.ToDouble(checked_cast<const DecimalType&>(ty).scale());
}
Expand Down Expand Up @@ -524,23 +530,13 @@ void AddQuantileKernels(VectorFunction* func) {
base.signature = KernelSignature::Make({InputType(ty)}, OutputType(ResolveOutput));
// output type is determined at runtime, set template argument to nulltype
base.exec = GenerateNumeric<QuantileExecutor, NullType>(*ty);
base.exec_chunked =
GenerateNumeric<QuantileExecutorChunked, NullType, VectorKernel::ChunkedExec>(
*ty);
DCHECK_OK(func->AddKernel(base));
}
{
base.signature =
KernelSignature::Make({InputType(Type::DECIMAL128)}, OutputType(ResolveOutput));
base.exec = QuantileExecutor<NullType, Decimal128Type>::Exec;
base.exec_chunked = QuantileExecutorChunked<NullType, Decimal128Type>::Exec;
base.exec_chunked = GenerateNumeric<QuantileExecutorChunked, NullType>(*ty);
DCHECK_OK(func->AddKernel(base));
}
{
base.signature =
KernelSignature::Make({InputType(Type::DECIMAL256)}, OutputType(ResolveOutput));
base.exec = QuantileExecutor<NullType, Decimal256Type>::Exec;
base.exec_chunked = QuantileExecutorChunked<NullType, Decimal256Type>::Exec;
for (auto type_id : DecimalTypeIds()) {
base.signature = KernelSignature::Make({type_id}, OutputType(ResolveOutput));
base.exec = GenerateDecimal<QuantileExecutor, NullType>(type_id);
base.exec_chunked = GenerateDecimal<QuantileExecutorChunked, NullType>(type_id);
DCHECK_OK(func->AddKernel(base));
}
}
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/arrow/compute/kernels/aggregate_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4263,7 +4263,8 @@ TEST(TestQuantileKernel, Decimal) {
ValidateOutput(*out_array);
AssertArraysEqual(*expected, *out_array, /*verbose=*/true);
};
for (const auto& ty : {decimal128(3, 2), decimal256(3, 2)}) {
for (const auto& ty :
{decimal32(3, 2), decimal64(3, 2), decimal128(3, 2), decimal256(3, 2)}) {
check(ArrayFromJSON(ty, R"(["1.00", "5.00", null])"),
QuantileOptions(0.5, QuantileOptions::LINEAR),
ArrayFromJSON(float64(), R"([3.00])"));
Expand Down
11 changes: 6 additions & 5 deletions cpp/src/arrow/compute/kernels/codegen_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -988,9 +988,9 @@ struct FailFunctor<VectorKernel::ChunkedExec> {
};

// GD for numeric types (integer and floating point)
template <template <typename...> class Generator, typename Type0,
typename KernelType = ArrayKernelExec, typename... Args>
KernelType GenerateNumeric(detail::GetTypeId get_id) {
template <template <typename...> class Generator, typename Type0, typename... Args>
auto GenerateNumeric(detail::GetTypeId get_id) {
using KernelType = decltype(&Generator<Type0, Int8Type, Args...>::Exec);
switch (get_id.id) {
case Type::INT8:
return Generator<Type0, Int8Type, Args...>::Exec;
Expand Down Expand Up @@ -1367,7 +1367,8 @@ ArrayKernelExec GenerateTemporal(detail::GetTypeId get_id) {
//
// See "Numeric" above for description of the generator functor
template <template <typename...> class Generator, typename Type0, typename... Args>
ArrayKernelExec GenerateDecimal(detail::GetTypeId get_id) {
auto GenerateDecimal(detail::GetTypeId get_id) {
using KernelType = decltype(&Generator<Type0, Decimal256Type, Args...>::Exec);
switch (get_id.id) {
case Type::DECIMAL32:
return Generator<Type0, Decimal32Type, Args...>::Exec;
Expand All @@ -1379,7 +1380,7 @@ ArrayKernelExec GenerateDecimal(detail::GetTypeId get_id) {
return Generator<Type0, Decimal256Type, Args...>::Exec;
default:
DCHECK(false);
return nullptr;
return KernelType(nullptr);
}
}

Expand Down
3 changes: 1 addition & 2 deletions cpp/src/arrow/compute/kernels/scalar_cast_boolean.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@ std::vector<std::shared_ptr<CastFunction>> GetBooleanCasts() {

for (const auto& ty : NumericTypes()) {
ArrayKernelExec exec =
GenerateNumeric<applicator::ScalarUnary, BooleanType, ArrayKernelExec, IsNonZero>(
*ty);
GenerateNumeric<applicator::ScalarUnary, BooleanType, IsNonZero>(*ty);
DCHECK_OK(func->AddKernel(ty->id(), {ty}, boolean(), exec));
}
for (const auto& ty : BaseBinaryTypes()) {
Expand Down
3 changes: 1 addition & 2 deletions cpp/src/arrow/compute/kernels/scalar_cast_string.cc
Original file line number Diff line number Diff line change
Expand Up @@ -683,8 +683,7 @@ void AddNumberToStringCasts(CastFunction* func) {
template <typename OutType>
void AddDecimalToStringCasts(CastFunction* func) {
auto out_ty = TypeTraits<OutType>::type_singleton();
for (const auto& in_tid : std::vector<Type::type>{Type::DECIMAL32, Type::DECIMAL64,
Type::DECIMAL128, Type::DECIMAL256}) {
for (const auto& in_tid : DecimalTypeIds()) {
DCHECK_OK(
func->AddKernel(in_tid, {in_tid}, out_ty,
GenerateDecimal<DecimalToStringCastFunctor, OutType>(in_tid),
Expand Down
209 changes: 209 additions & 0 deletions cpp/src/arrow/compute/kernels/vector_statistics.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include <functional>
#include <memory>
#include <optional>
#include <utility>

#include "arrow/compute/api_aggregate.h"
#include "arrow/compute/api_vector.h"
#include "arrow/compute/exec.h"
#include "arrow/compute/function.h"
#include "arrow/compute/kernel.h"
#include "arrow/compute/kernels/codegen_internal.h"
#include "arrow/compute/registry.h"
#include "arrow/result.h"
#include "arrow/scalar.h"
#include "arrow/status.h"
#include "arrow/util/bit_run_reader.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/logging.h"

namespace arrow::compute::internal {

using ::arrow::internal::checked_cast;

namespace {

Status ValidateOptions(const WinsorizeOptions& options) {
if (!(options.lower_limit >= 0 && options.lower_limit <= 1) ||
!(options.upper_limit >= 0 && options.upper_limit <= 1)) {
return Status::Invalid("winsorize limits must be between 0 and 1");
}
if (options.lower_limit > options.upper_limit) {
return Status::Invalid(
"winsorize upper limit must be equal or greater than lower limit");
}
return Status::OK();
}

using WinsorizeState = internal::OptionsWrapper<WinsorizeOptions>;

// We have a first unused template parameter for compatibility with GenerateNumeric.
template <typename Unused, typename Type>
struct Winsorize {
using ArrayType = typename TypeTraits<Type>::ArrayType;
using CType = typename TypeTraits<Type>::CType;

static Status Exec(KernelContext* ctx, const ExecSpan& batch, ExecResult* out) {
const auto& options = WinsorizeState::Get(ctx);
RETURN_NOT_OK(ValidateOptions(options));
auto data = batch.values[0].array.ToArrayData();
ARROW_ASSIGN_OR_RAISE(auto maybe_quantiles, GetQuantileValues(ctx, data, options));
auto out_data = out->array_data_mutable();
if (!maybe_quantiles.has_value()) {
// Only nulls and NaNs => return input as-is
out_data->null_count = data->null_count.load();
out_data->length = data->length;
out_data->buffers = data->buffers;
return Status::OK();
}
return ClipValues(*data, maybe_quantiles.value(), out_data, ctx);
}

static Status ExecChunked(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
const auto& options = WinsorizeState::Get(ctx);
RETURN_NOT_OK(ValidateOptions(options));
const auto& chunked_array = batch.values[0].chunked_array();
ARROW_ASSIGN_OR_RAISE(auto maybe_quantiles,
GetQuantileValues(ctx, chunked_array, options));
if (!maybe_quantiles.has_value()) {
// Only nulls and NaNs => return input as-is
*out = chunked_array;
return Status::OK();
}
ArrayVector out_chunks;
out_chunks.reserve(chunked_array->num_chunks());
for (const auto& chunk : chunked_array->chunks()) {
auto out_data = chunk->data()->Copy();
RETURN_NOT_OK(
ClipValues(*chunk->data(), maybe_quantiles.value(), out_data.get(), ctx));
out_chunks.push_back(MakeArray(out_data));
}
return ChunkedArray::Make(std::move(out_chunks)).Value(out);
}

struct QuantileValues {
CType lower_bound, upper_bound;
};

static Result<std::optional<QuantileValues>> GetQuantileValues(
KernelContext* ctx, const Datum& input, const WinsorizeOptions& options) {
// We use "nearest" to avoid the conversion of quantile values to double.
QuantileOptions quantile_options(/*q=*/{options.lower_limit, options.upper_limit},
QuantileOptions::NEAREST);
ARROW_ASSIGN_OR_RAISE(
auto quantile,
CallFunction("quantile", {input}, &quantile_options, ctx->exec_context()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pitrou Do you think There is any benefit to resolve and use the quantile kernel here directly as supposed to use CallFunction?

I suppose it is easier this way (using CallFunction), but I wonder, in general, when writing a kernel that uses other kernel/functions, whether it is better to use CallFunction or resolve it kernel and use kernel->Exec

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolving the kernel could perhaps save some nanoseconds, but I'm not sure that's significant compared to the other costs.

auto quantile_array = quantile.array_as<ArrayType>();
DCHECK_EQ(quantile_array->length(), 2);
// The quantile function outputs either all nulls or no nulls at all.
if (quantile_array->null_count() == 2) {
return std::nullopt;
}
DCHECK_EQ(quantile_array->null_count(), 0);
return QuantileValues{CType(quantile_array->Value(0)),
CType(quantile_array->Value(1))};
}

static Status ClipValues(const ArrayData& data, QuantileValues quantiles,
ArrayData* out, KernelContext* ctx) {
DCHECK_EQ(out->buffers.size(), data.buffers.size());
out->null_count = data.null_count.load();
out->length = data.length;
out->buffers[0] = data.buffers[0];
ARROW_ASSIGN_OR_RAISE(out->buffers[1], ctx->Allocate(out->length * sizeof(CType)));
// Avoid leaving uninitialized memory under null entries
std::memset(out->buffers[1]->mutable_data(), 0, out->length * sizeof(CType));

const CType* in_values = data.GetValues<CType>(1);
CType* out_values = out->GetMutableValues<CType>(1);

auto visit = [&](int64_t position, int64_t length) {
for (int64_t i = position; i < position + length; ++i) {
if (in_values[i] < quantiles.lower_bound) {
out_values[i] = quantiles.lower_bound;
} else if (in_values[i] > quantiles.upper_bound) {
out_values[i] = quantiles.upper_bound;
} else {
// NaNs also fall here
out_values[i] = in_values[i];
}
}
};
arrow::internal::VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length,
visit);
return Status::OK();
}
};

template <typename Unused, typename Type>
struct WinsorizeChunked {
static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
return Winsorize<Unused, Type>::ExecChunked(ctx, batch, out);
}
};

Result<TypeHolder> ResolveWinsorizeOutput(KernelContext* ctx,
const std::vector<TypeHolder>& in_types) {
DCHECK_EQ(in_types.size(), 1);
return in_types[0];
}

const FunctionDoc winsorize_doc(
"Winsorize an array",
("This function applies a winsorization transform to the input array\n"
"so as to reduce the influence of potential outliers.\n"
"NaNs and nulls in the input are ignored for the purpose of computing\n"
"the lower and upper quantiles.\n"
"The quantile limits can be changed in WinsorizeOptions."),
{"array"}, "WinsorizeOptions", /*options_required=*/true);

} // namespace

void RegisterVectorStatistics(FunctionRegistry* registry) {
static const auto default_winsorize_options = WinsorizeOptions();

auto winsorize = std::make_shared<VectorFunction>(
"winsorize", Arity::Unary(), winsorize_doc, &default_winsorize_options);

VectorKernel base;
base.init = WinsorizeState::Init;
base.mem_allocation = MemAllocation::NO_PREALLOCATE;
base.null_handling = NullHandling::COMPUTED_NO_PREALLOCATE;
base.can_execute_chunkwise = false;
// The variable is ill-named, but since we output a ChunkedArray ourselves,
// the function execution logic shouldn't try to wrap it again.
base.output_chunked = false;

for (const auto& ty : NumericTypes()) {
base.signature = KernelSignature::Make({ty->id()}, &ResolveWinsorizeOutput);
base.exec = GenerateNumeric<Winsorize, /*Unused*/ void>(ty->id());
base.exec_chunked = GenerateNumeric<WinsorizeChunked, /*Unused*/ void>(ty->id());
DCHECK_OK(winsorize->AddKernel(base));
}
for (auto type_id : DecimalTypeIds()) {
base.signature = KernelSignature::Make({type_id}, &ResolveWinsorizeOutput);
base.exec = GenerateDecimal<Winsorize, /*Unused*/ void>(type_id);
base.exec_chunked = GenerateDecimal<WinsorizeChunked, /*Unused*/ void>(type_id);
DCHECK_OK(winsorize->AddKernel(base));
}
DCHECK_OK(registry->AddFunction(std::move(winsorize)));
}

} // namespace arrow::compute::internal
Loading
Loading