From 28a13320f4aa98107ef3d1032747c4f709dafc74 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Wed, 12 Mar 2025 10:45:47 +0100 Subject: [PATCH 1/5] GH-45755: [C++][Compute] Add winsorize function --- cpp/src/arrow/CMakeLists.txt | 1 + cpp/src/arrow/compute/api_vector.cc | 9 + cpp/src/arrow/compute/api_vector.h | 19 ++ cpp/src/arrow/compute/kernels/CMakeLists.txt | 1 + .../arrow/compute/kernels/aggregate_mode.cc | 7 +- .../compute/kernels/aggregate_quantile.cc | 26 +-- .../arrow/compute/kernels/aggregate_test.cc | 3 +- .../arrow/compute/kernels/codegen_internal.h | 11 +- .../compute/kernels/scalar_cast_boolean.cc | 3 +- .../compute/kernels/scalar_cast_string.cc | 3 +- .../compute/kernels/vector_statistics.cc | 208 ++++++++++++++++++ .../compute/kernels/vector_statistics_test.cc | 168 ++++++++++++++ cpp/src/arrow/compute/registry.cc | 1 + cpp/src/arrow/compute/registry_internal.h | 1 + cpp/src/arrow/type.cc | 6 + cpp/src/arrow/type.h | 4 + python/pyarrow/_compute.pyx | 21 ++ python/pyarrow/compute.py | 1 + python/pyarrow/includes/libarrow.pxd | 4 + python/pyarrow/tests/test_compute.py | 12 + 20 files changed, 480 insertions(+), 29 deletions(-) create mode 100644 cpp/src/arrow/compute/kernels/vector_statistics.cc create mode 100644 cpp/src/arrow/compute/kernels/vector_statistics_test.cc diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index e3c596d3895..a77ac4abfbf 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -783,6 +783,7 @@ if(ARROW_COMPUTE) compute/kernels/vector_run_end_encode.cc compute/kernels/vector_select_k.cc compute/kernels/vector_sort.cc + compute/kernels/vector_statistics.cc compute/kernels/vector_swizzle.cc compute/key_hash_internal.cc compute/key_map_internal.cc diff --git a/cpp/src/arrow/compute/api_vector.cc b/cpp/src/arrow/compute/api_vector.cc index 53ceed1b089..012e403e705 100644 --- a/cpp/src/arrow/compute/api_vector.cc +++ b/cpp/src/arrow/compute/api_vector.cc @@ -142,6 +142,9 @@ static auto kSortOptionsType = GetFunctionOptionsType( static auto kPartitionNthOptionsType = GetFunctionOptionsType( DataMember("pivot", &PartitionNthOptions::pivot), DataMember("null_placement", &PartitionNthOptions::null_placement)); +static auto kWinsorizeOptionsType = GetFunctionOptionsType( + DataMember("lower_limit", &WinsorizeOptions::lower_limit), + DataMember("upper_limit", &WinsorizeOptions::upper_limit)); static auto kSelectKOptionsType = GetFunctionOptionsType( DataMember("k", &SelectKOptions::k), DataMember("sort_keys", &SelectKOptions::sort_keys)); @@ -208,6 +211,11 @@ PartitionNthOptions::PartitionNthOptions(int64_t pivot, NullPlacement null_place null_placement(null_placement) {} constexpr char PartitionNthOptions::kTypeName[]; +WinsorizeOptions::WinsorizeOptions(double lower_limit, double upper_limit) + : FunctionOptions(internal::kWinsorizeOptionsType), + lower_limit(lower_limit), + upper_limit(upper_limit) {} + SelectKOptions::SelectKOptions(int64_t k, std::vector sort_keys) : FunctionOptions(internal::kSelectKOptionsType), k(k), @@ -275,6 +283,7 @@ void RegisterVectorOptions(FunctionRegistry* registry) { DCHECK_OK(registry->AddFunctionOptionsType(kListFlattenOptionsType)); DCHECK_OK(registry->AddFunctionOptionsType(kInversePermutationOptionsType)); DCHECK_OK(registry->AddFunctionOptionsType(kScatterOptionsType)); + DCHECK_OK(registry->AddFunctionOptionsType(kWinsorizeOptionsType)); } } // namespace internal diff --git a/cpp/src/arrow/compute/api_vector.h b/cpp/src/arrow/compute/api_vector.h index 22bb1647197..69e4b243c97 100644 --- a/cpp/src/arrow/compute/api_vector.h +++ b/cpp/src/arrow/compute/api_vector.h @@ -228,6 +228,25 @@ class ARROW_EXPORT PartitionNthOptions : public FunctionOptions { NullPlacement null_placement; }; +class ARROW_EXPORT WinsorizeOptions : public FunctionOptions { + public: + WinsorizeOptions(double lower_limit, double upper_limit); + WinsorizeOptions() : WinsorizeOptions(0, 1) {} + static constexpr char const kTypeName[] = "WinsorizeOptions"; + + /// The quantile below which all values are replaced with the quantile's value. + /// + /// For example, if lower_limit = 0.05, then all values in the lower 5% percentile + /// will be replaced with the 5% percentile value. + double lower_limit; + + /// The quantile above which all values are replaced with the quantile's value. + /// + /// For example, if upper_limit = 0.95, then all values in the upper 95% percentile + /// will be replaced with the 95% percentile value. + double upper_limit; +}; + /// \brief Options for cumulative functions /// \note Also aliased as CumulativeSumOptions for backward compatibility class ARROW_EXPORT CumulativeOptions : public FunctionOptions { diff --git a/cpp/src/arrow/compute/kernels/CMakeLists.txt b/cpp/src/arrow/compute/kernels/CMakeLists.txt index 4dedd1f23e0..81b7adeb4aa 100644 --- a/cpp/src/arrow/compute/kernels/CMakeLists.txt +++ b/cpp/src/arrow/compute/kernels/CMakeLists.txt @@ -105,6 +105,7 @@ add_arrow_compute_test(vector_test vector_nested_test.cc vector_replace_test.cc vector_run_end_encode_test.cc + vector_statistics_test.cc select_k_test.cc EXTRA_LINK_LIBS arrow_compute_kernels_testing diff --git a/cpp/src/arrow/compute/kernels/aggregate_mode.cc b/cpp/src/arrow/compute/kernels/aggregate_mode.cc index 3f84c0a5ee4..e9723cef7b0 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_mode.cc +++ b/cpp/src/arrow/compute/kernels/aggregate_mode.cc @@ -495,10 +495,9 @@ void RegisterScalarAggregateMode(FunctionRegistry* registry) { ModeExecutorChunked::Exec))); for (const auto& type : NumericTypes()) { // TODO(wesm): - DCHECK_OK(func->AddKernel(NewModeKernel( - type, GenerateNumeric(*type), - GenerateNumeric( - *type)))); + DCHECK_OK(func->AddKernel( + NewModeKernel(type, GenerateNumeric(*type), + GenerateNumeric(*type)))); } // Type parameters are ignored DCHECK_OK(func->AddKernel( diff --git a/cpp/src/arrow/compute/kernels/aggregate_quantile.cc b/cpp/src/arrow/compute/kernels/aggregate_quantile.cc index f4826229dd4..5e6007a0c1e 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_quantile.cc +++ b/cpp/src/arrow/compute/kernels/aggregate_quantile.cc @@ -76,6 +76,12 @@ template double DataPointToDouble(T value, const DataType&) { return static_cast(value); } +double DataPointToDouble(const Decimal32& value, const DataType& ty) { + return value.ToDouble(checked_cast(ty).scale()); +} +double DataPointToDouble(const Decimal64& value, const DataType& ty) { + return value.ToDouble(checked_cast(ty).scale()); +} double DataPointToDouble(const Decimal128& value, const DataType& ty) { return value.ToDouble(checked_cast(ty).scale()); } @@ -524,23 +530,13 @@ void AddQuantileKernels(VectorFunction* func) { base.signature = KernelSignature::Make({InputType(ty)}, OutputType(ResolveOutput)); // output type is determined at runtime, set template argument to nulltype base.exec = GenerateNumeric(*ty); - base.exec_chunked = - GenerateNumeric( - *ty); - DCHECK_OK(func->AddKernel(base)); - } - { - base.signature = - KernelSignature::Make({InputType(Type::DECIMAL128)}, OutputType(ResolveOutput)); - base.exec = QuantileExecutor::Exec; - base.exec_chunked = QuantileExecutorChunked::Exec; + base.exec_chunked = GenerateNumeric(*ty); DCHECK_OK(func->AddKernel(base)); } - { - base.signature = - KernelSignature::Make({InputType(Type::DECIMAL256)}, OutputType(ResolveOutput)); - base.exec = QuantileExecutor::Exec; - base.exec_chunked = QuantileExecutorChunked::Exec; + for (auto type_id : DecimalTypeIds()) { + base.signature = KernelSignature::Make({type_id}, OutputType(ResolveOutput)); + base.exec = GenerateDecimal(type_id); + base.exec_chunked = GenerateDecimal(type_id); DCHECK_OK(func->AddKernel(base)); } } diff --git a/cpp/src/arrow/compute/kernels/aggregate_test.cc b/cpp/src/arrow/compute/kernels/aggregate_test.cc index 766b5b1cd6b..ec012a42cd3 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_test.cc +++ b/cpp/src/arrow/compute/kernels/aggregate_test.cc @@ -4263,7 +4263,8 @@ TEST(TestQuantileKernel, Decimal) { ValidateOutput(*out_array); AssertArraysEqual(*expected, *out_array, /*verbose=*/true); }; - for (const auto& ty : {decimal128(3, 2), decimal256(3, 2)}) { + for (const auto& ty : + {decimal32(3, 2), decimal64(3, 2), decimal128(3, 2), decimal256(3, 2)}) { check(ArrayFromJSON(ty, R"(["1.00", "5.00", null])"), QuantileOptions(0.5, QuantileOptions::LINEAR), ArrayFromJSON(float64(), R"([3.00])")); diff --git a/cpp/src/arrow/compute/kernels/codegen_internal.h b/cpp/src/arrow/compute/kernels/codegen_internal.h index 2a492f581f5..1d49579a5a9 100644 --- a/cpp/src/arrow/compute/kernels/codegen_internal.h +++ b/cpp/src/arrow/compute/kernels/codegen_internal.h @@ -988,9 +988,9 @@ struct FailFunctor { }; // GD for numeric types (integer and floating point) -template