diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index 4403def9949..382a851c159 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -366,6 +366,7 @@ if(ARROW_COMPUTE) compute/kernels/aggregate_basic.cc compute/kernels/aggregate_mode.cc compute/kernels/aggregate_quantile.cc + compute/kernels/aggregate_tdigest.cc compute/kernels/aggregate_var_std.cc compute/kernels/codegen_internal.cc compute/kernels/scalar_arithmetic.cc diff --git a/cpp/src/arrow/compute/api_aggregate.cc b/cpp/src/arrow/compute/api_aggregate.cc index 586eac2eeae..5afa1048960 100644 --- a/cpp/src/arrow/compute/api_aggregate.cc +++ b/cpp/src/arrow/compute/api_aggregate.cc @@ -68,5 +68,10 @@ Result Quantile(const Datum& value, const QuantileOptions& options, return CallFunction("quantile", {value}, &options, ctx); } +Result TDigest(const Datum& value, const TDigestOptions& options, + ExecContext* ctx) { + return CallFunction("tdigest", {value}, &options, ctx); +} + } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/api_aggregate.h b/cpp/src/arrow/compute/api_aggregate.h index 335186122fd..eef1587bb73 100644 --- a/cpp/src/arrow/compute/api_aggregate.h +++ b/cpp/src/arrow/compute/api_aggregate.h @@ -127,6 +127,28 @@ struct ARROW_EXPORT QuantileOptions : public FunctionOptions { enum Interpolation interpolation; }; +/// \brief Control TDigest approximate quantile kernel behavior +/// +/// By default, returns the median value. +struct ARROW_EXPORT TDigestOptions : public FunctionOptions { + explicit TDigestOptions(double q = 0.5, uint32_t delta = 100, + uint32_t buffer_size = 500) + : q{q}, delta{delta}, buffer_size{buffer_size} {} + + explicit TDigestOptions(std::vector q, uint32_t delta = 100, + uint32_t buffer_size = 500) + : q{std::move(q)}, delta{delta}, buffer_size{buffer_size} {} + + static TDigestOptions Defaults() { return TDigestOptions{}; } + + /// quantile must be between 0 and 1 inclusive + std::vector q; + /// compression parameter, default 100 + uint32_t delta; + /// input buffer size, default 500 + uint32_t buffer_size; +}; + /// @} /// \brief Count non-null (or null) values in an array. @@ -270,5 +292,19 @@ Result Quantile(const Datum& value, const QuantileOptions& options = QuantileOptions::Defaults(), ExecContext* ctx = NULLPTR); +/// \brief Calculate the approximate quantiles of a numeric array with T-Digest algorithm +/// +/// \param[in] value input datum, expecting Array or ChunkedArray +/// \param[in] options see TDigestOptions for more information +/// \param[in] ctx the function execution context, optional +/// \return resulting datum as an array +/// +/// \since 4.0.0 +/// \note API not yet finalized +ARROW_EXPORT +Result TDigest(const Datum& value, + const TDigestOptions& options = TDigestOptions::Defaults(), + ExecContext* ctx = NULLPTR); + } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/kernels/aggregate_benchmark.cc b/cpp/src/arrow/compute/kernels/aggregate_benchmark.cc index db5db543013..c90dd03c06e 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_benchmark.cc +++ b/cpp/src/arrow/compute/kernels/aggregate_benchmark.cc @@ -461,11 +461,23 @@ VARIANCE_KERNEL_BENCHMARK(VarianceKernelDouble, DoubleType); // Quantile // +static std::vector deciles() { + return {0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0}; +} + +static std::vector centiles() { + std::vector q(101); + for (int i = 0; i <= 100; ++i) { + q[i] = i / 100.0; + } + return q; +} + template -void QuantileKernelBench(benchmark::State& state, int min, int max) { +void QuantileKernel(benchmark::State& state, int min, int max, std::vector q) { using CType = typename TypeTraits::CType; - QuantileOptions options; + QuantileOptions options(std::move(q)); RegressionArgs args(state); const int64_t array_size = args.size / sizeof(CType); auto rand = random::RandomArrayGenerator(1926); @@ -474,29 +486,90 @@ void QuantileKernelBench(benchmark::State& state, int min, int max) { for (auto _ : state) { ABORT_NOT_OK(Quantile(array, options).status()); } + state.SetItemsProcessed(state.iterations() * array_size); +} + +template +void QuantileKernelMedian(benchmark::State& state, int min, int max) { + QuantileKernel(state, min, max, {0.5}); +} + +template +void QuantileKernelMedianWide(benchmark::State& state) { + QuantileKernel(state, 0, 1 << 24, {0.5}); +} + +template +void QuantileKernelMedianNarrow(benchmark::State& state) { + QuantileKernel(state, -30000, 30000, {0.5}); +} + +template +void QuantileKernelDecilesWide(benchmark::State& state) { + QuantileKernel(state, 0, 1 << 24, deciles()); +} + +template +void QuantileKernelDecilesNarrow(benchmark::State& state) { + QuantileKernel(state, -30000, 30000, deciles()); +} + +template +void QuantileKernelCentilesWide(benchmark::State& state) { + QuantileKernel(state, 0, 1 << 24, centiles()); +} + +template +void QuantileKernelCentilesNarrow(benchmark::State& state) { + QuantileKernel(state, -30000, 30000, centiles()); } -static void QuantileKernelBenchArgs(benchmark::internal::Benchmark* bench) { +static void QuantileKernelArgs(benchmark::internal::Benchmark* bench) { BenchmarkSetArgsWithSizes(bench, {1 * 1024 * 1024}); } -#define QUANTILE_KERNEL_BENCHMARK_WIDE(FuncName, Type) \ - static void FuncName(benchmark::State& state) { \ - QuantileKernelBench(state, 0, 1 << 24); \ - } \ - BENCHMARK(FuncName)->Apply(QuantileKernelBenchArgs) - -#define QUANTILE_KERNEL_BENCHMARK_NARROW(FuncName, Type) \ - static void FuncName(benchmark::State& state) { \ - QuantileKernelBench(state, -30000, 30000); \ - } \ - BENCHMARK(FuncName)->Apply(QuantileKernelBenchArgs) - -QUANTILE_KERNEL_BENCHMARK_WIDE(QuantileKernelInt32Wide, Int32Type); -QUANTILE_KERNEL_BENCHMARK_NARROW(QuantileKernelInt32Narrow, Int32Type); -QUANTILE_KERNEL_BENCHMARK_WIDE(QuantileKernelInt64Wide, Int64Type); -QUANTILE_KERNEL_BENCHMARK_NARROW(QuantileKernelInt64Narrow, Int64Type); -QUANTILE_KERNEL_BENCHMARK_WIDE(QuantileKernelDouble, DoubleType); +BENCHMARK_TEMPLATE(QuantileKernelMedianNarrow, Int32Type)->Apply(QuantileKernelArgs); +BENCHMARK_TEMPLATE(QuantileKernelMedianWide, Int32Type)->Apply(QuantileKernelArgs); +BENCHMARK_TEMPLATE(QuantileKernelMedianNarrow, Int64Type)->Apply(QuantileKernelArgs); +BENCHMARK_TEMPLATE(QuantileKernelMedianWide, Int64Type)->Apply(QuantileKernelArgs); +BENCHMARK_TEMPLATE(QuantileKernelMedianWide, DoubleType)->Apply(QuantileKernelArgs); + +BENCHMARK_TEMPLATE(QuantileKernelDecilesNarrow, Int32Type)->Apply(QuantileKernelArgs); +BENCHMARK_TEMPLATE(QuantileKernelDecilesWide, Int32Type)->Apply(QuantileKernelArgs); +BENCHMARK_TEMPLATE(QuantileKernelDecilesWide, DoubleType)->Apply(QuantileKernelArgs); + +BENCHMARK_TEMPLATE(QuantileKernelCentilesNarrow, Int32Type)->Apply(QuantileKernelArgs); +BENCHMARK_TEMPLATE(QuantileKernelCentilesWide, Int32Type)->Apply(QuantileKernelArgs); +BENCHMARK_TEMPLATE(QuantileKernelCentilesWide, DoubleType)->Apply(QuantileKernelArgs); + +static void TDigestKernelDouble(benchmark::State& state, std::vector q) { + TDigestOptions options{std::move(q)}; + RegressionArgs args(state); + const int64_t array_size = args.size / sizeof(double); + auto rand = random::RandomArrayGenerator(1926); + auto array = rand.Numeric(array_size, 0, 1 << 24, args.null_proportion); + + for (auto _ : state) { + ABORT_NOT_OK(TDigest(array, options).status()); + } + state.SetItemsProcessed(state.iterations() * array_size); +} + +static void TDigestKernelDoubleMedian(benchmark::State& state) { + TDigestKernelDouble(state, {0.5}); +} + +static void TDigestKernelDoubleDeciles(benchmark::State& state) { + TDigestKernelDouble(state, deciles()); +} + +static void TDigestKernelDoubleCentiles(benchmark::State& state) { + TDigestKernelDouble(state, centiles()); +} + +BENCHMARK(TDigestKernelDoubleMedian)->Apply(QuantileKernelArgs); +BENCHMARK(TDigestKernelDoubleDeciles)->Apply(QuantileKernelArgs); +BENCHMARK(TDigestKernelDoubleCentiles)->Apply(QuantileKernelArgs); } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/kernels/aggregate_tdigest.cc b/cpp/src/arrow/compute/kernels/aggregate_tdigest.cc new file mode 100644 index 00000000000..fc8f43b0ae2 --- /dev/null +++ b/cpp/src/arrow/compute/kernels/aggregate_tdigest.cc @@ -0,0 +1,153 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/compute/api_aggregate.h" +#include "arrow/compute/kernels/aggregate_internal.h" +#include "arrow/compute/kernels/common.h" +#include "arrow/util/bit_run_reader.h" +#include "arrow/util/tdigest.h" + +namespace arrow { +namespace compute { +namespace internal { + +namespace { + +using arrow::internal::TDigest; +using arrow::internal::VisitSetBitRunsVoid; + +template +struct TDigestImpl : public ScalarAggregator { + using ThisType = TDigestImpl; + using ArrayType = typename TypeTraits::ArrayType; + using CType = typename ArrowType::c_type; + + explicit TDigestImpl(const TDigestOptions& options) + : q{options.q}, tdigest{options.delta, options.buffer_size} {} + + void Consume(KernelContext*, const ExecBatch& batch) override { + const ArrayData& data = *batch[0].array(); + const CType* values = data.GetValues(1); + + if (data.length > data.GetNullCount()) { + VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length, + [&](int64_t pos, int64_t len) { + for (int64_t i = 0; i < len; ++i) { + this->tdigest.NanAdd(values[pos + i]); + } + }); + } + } + + void MergeFrom(KernelContext*, KernelState&& src) override { + auto& other = checked_cast(src); + std::vector other_tdigest; + other_tdigest.push_back(std::move(other.tdigest)); + this->tdigest.Merge(&other_tdigest); + } + + void Finalize(KernelContext* ctx, Datum* out) override { + const int64_t out_length = this->tdigest.is_empty() ? 0 : this->q.size(); + auto out_data = ArrayData::Make(float64(), out_length, 0); + out_data->buffers.resize(2, nullptr); + + if (out_length > 0) { + KERNEL_ASSIGN_OR_RAISE(out_data->buffers[1], ctx, + ctx->Allocate(out_length * sizeof(double))); + double* out_buffer = out_data->template GetMutableValues(1); + for (int64_t i = 0; i < out_length; ++i) { + out_buffer[i] = this->tdigest.Quantile(this->q[i]); + } + } + + *out = Datum(std::move(out_data)); + } + + const std::vector& q; + TDigest tdigest; +}; + +struct TDigestInitState { + std::unique_ptr state; + KernelContext* ctx; + const DataType& in_type; + const TDigestOptions& options; + + TDigestInitState(KernelContext* ctx, const DataType& in_type, + const TDigestOptions& options) + : ctx(ctx), in_type(in_type), options(options) {} + + Status Visit(const DataType&) { + return Status::NotImplemented("No tdigest implemented"); + } + + Status Visit(const HalfFloatType&) { + return Status::NotImplemented("No tdigest implemented"); + } + + template + enable_if_t::value, Status> Visit(const Type&) { + state.reset(new TDigestImpl(options)); + return Status::OK(); + } + + std::unique_ptr Create() { + ctx->SetStatus(VisitTypeInline(in_type, this)); + return std::move(state); + } +}; + +std::unique_ptr TDigestInit(KernelContext* ctx, const KernelInitArgs& args) { + TDigestInitState visitor(ctx, *args.inputs[0].type, + static_cast(*args.options)); + return visitor.Create(); +} + +void AddTDigestKernels(KernelInit init, + const std::vector>& types, + ScalarAggregateFunction* func) { + for (const auto& ty : types) { + auto sig = KernelSignature::Make({InputType::Array(ty)}, float64()); + AddAggKernel(std::move(sig), init, func); + } +} + +const FunctionDoc tdigest_doc{ + "Approximate quantiles of a numeric array with T-Digest algorithm", + ("By default, 0.5 quantile (median) is returned.\n" + "Nulls and NaNs are ignored.\n" + "An empty array is returned if there is no valid data point."), + {"array"}, + "TDigestOptions"}; + +std::shared_ptr AddTDigestAggKernels() { + static auto default_tdigest_options = TDigestOptions::Defaults(); + auto func = std::make_shared( + "tdigest", Arity::Unary(), &tdigest_doc, &default_tdigest_options); + AddTDigestKernels(TDigestInit, NumericTypes(), func.get()); + return func; +} + +} // namespace + +void RegisterScalarAggregateTDigest(FunctionRegistry* registry) { + DCHECK_OK(registry->AddFunction(AddTDigestAggKernels())); +} + +} // namespace internal +} // namespace compute +} // namespace arrow diff --git a/cpp/src/arrow/compute/kernels/aggregate_test.cc b/cpp/src/arrow/compute/kernels/aggregate_test.cc index 47872565ab6..e772d474909 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_test.cc +++ b/cpp/src/arrow/compute/kernels/aggregate_test.cc @@ -1292,7 +1292,7 @@ TYPED_TEST(TestVarStdKernelRandom, Basics) { double var_population, var_sample; using ArrayType = typename TypeTraits::ArrayType; - auto typed_array = std::static_pointer_cast(array->Slice(0, total_size)); + auto typed_array = checked_pointer_cast(array->Slice(0, total_size)); std::tie(var_population, var_sample) = WelfordVar(*typed_array); this->AssertVarStdIs(chunked, VarianceOptions{0}, var_population); @@ -1313,7 +1313,7 @@ TEST_F(TestVarStdKernelIntegerLength, Basics) { // auto array = rand.Numeric(4000000000, min, min + 100000, 0.1); double var_population, var_sample; - auto int32_array = std::static_pointer_cast(array); + auto int32_array = checked_pointer_cast(array); std::tie(var_population, var_sample) = WelfordVar(*int32_array); this->AssertVarStdIs(*array, VarianceOptions{0}, var_population); @@ -1343,22 +1343,22 @@ class TestPrimitiveQuantileKernel : public ::testing::Test { ASSERT_OK(out_array->ValidateFull()); ASSERT_EQ(out_array->length(), options.q.size()); ASSERT_EQ(out_array->null_count(), 0); - ASSERT_EQ(out_array->type(), expected[0][i].type()); + AssertTypeEqual(out_array->type(), expected[0][i].type()); - if (out_array->type() == float64()) { + if (out_array->type()->Equals(float64())) { const double* quantiles = out_array->data()->GetValues(1); for (int64_t j = 0; j < out_array->length(); ++j) { const auto& numeric_scalar = - std::static_pointer_cast(expected[j][i].scalar()); + checked_pointer_cast(expected[j][i].scalar()); ASSERT_TRUE((quantiles[j] == numeric_scalar->value) || (std::isnan(quantiles[j]) && std::isnan(numeric_scalar->value))); } } else { - ASSERT_EQ(out_array->type(), type_singleton()); + AssertTypeEqual(out_array->type(), type_singleton()); const CType* quantiles = out_array->data()->GetValues(1); for (int64_t j = 0; j < out_array->length(); ++j) { const auto& numeric_scalar = - std::static_pointer_cast>(expected[j][i].scalar()); + checked_pointer_cast>(expected[j][i].scalar()); ASSERT_EQ(quantiles[j], numeric_scalar->value); } } @@ -1530,42 +1530,86 @@ TEST_F(TestInt64QuantileKernel, Int64) { #undef O #ifndef __MINGW32__ -class TestRandomQuantileKernel : public TestPrimitiveQuantileKernel { +class TestRandomQuantileKernel : public TestPrimitiveQuantileKernel { public: void CheckQuantiles(int64_t array_size, int64_t num_quantiles) { - auto rand = random::RandomArrayGenerator(0x5487658); + std::shared_ptr array; + std::vector quantiles; // small value range to exercise input array with equal values and histogram approach - const auto array = rand.Numeric(array_size, -100, 200, 0.1); + GenerateTestData(array_size, num_quantiles, -100, 200, &array, &quantiles); + this->AssertQuantilesAre(array, QuantileOptions{quantiles}, + NaiveQuantile(*array, quantiles, interpolations_)); + } + + void CheckTDigests(const std::vector& chunk_sizes, int64_t num_quantiles) { + int total_size = 0; + for (int size : chunk_sizes) { + total_size += size; + } + std::shared_ptr array; std::vector quantiles; - random_real(num_quantiles, 0x5487658, 0.0, 1.0, &quantiles); - // make sure to exercise 0 and 1 quantiles - *std::min_element(quantiles.begin(), quantiles.end()) = 0; - *std::max_element(quantiles.begin(), quantiles.end()) = 1; + GenerateTestData(total_size, num_quantiles, 100, 123456789, &array, &quantiles); - this->AssertQuantilesAre(array, QuantileOptions{quantiles}, - NaiveQuantile(*array, quantiles)); + total_size = 0; + ArrayVector array_vector; + for (int size : chunk_sizes) { + array_vector.emplace_back(array->Slice(total_size, size)); + total_size += size; + } + auto chunked = *ChunkedArray::Make(array_vector); + + TDigestOptions options(quantiles); + ASSERT_OK_AND_ASSIGN(Datum out, TDigest(chunked, options)); + const auto& out_array = out.make_array(); + ASSERT_OK(out_array->ValidateFull()); + ASSERT_EQ(out_array->length(), quantiles.size()); + ASSERT_EQ(out_array->null_count(), 0); + AssertTypeEqual(out_array->type(), float64()); + + // linear interpolated exact quantile as reference + std::vector> exact = + NaiveQuantile(*array, quantiles, {QuantileOptions::LINEAR}); + const double* approx = out_array->data()->GetValues(1); + for (size_t i = 0; i < quantiles.size(); ++i) { + const auto& exact_scalar = checked_pointer_cast(exact[i][0].scalar()); + const double tolerance = std::fabs(exact_scalar->value) * 0.05; + EXPECT_NEAR(approx[i], exact_scalar->value, tolerance) << quantiles[i]; + } } private: - std::vector> NaiveQuantile(const Array& array, - const std::vector& quantiles) { + void GenerateTestData(int64_t array_size, int64_t num_quantiles, int min, int max, + std::shared_ptr* array, std::vector* quantiles) { + auto rand = random::RandomArrayGenerator(0x5487658); + *array = rand.Float64(array_size, min, max, /*null_prob=*/0.1, /*nan_prob=*/0.2); + + random_real(num_quantiles, 0x5487658, 0.0, 1.0, quantiles); + // make sure to exercise 0 and 1 quantiles + *std::min_element(quantiles->begin(), quantiles->end()) = 0; + *std::max_element(quantiles->begin(), quantiles->end()) = 1; + } + + std::vector> NaiveQuantile( + const Array& array, const std::vector& quantiles, + const std::vector& interpolations) { // copy and sort input array - std::vector input(array.length() - array.null_count()); - const int32_t* values = array.data()->GetValues(1); + std::vector input(array.length() - array.null_count()); + const double* values = array.data()->GetValues(1); const auto bitmap = array.null_bitmap_data(); int64_t index = 0; for (int64_t i = 0; i < array.length(); ++i) { - if (BitUtil::GetBit(bitmap, i)) { + if (BitUtil::GetBit(bitmap, i) && !std::isnan(values[i])) { input[index++] = values[i]; } } + input.resize(index); std::sort(input.begin(), input.end()); std::vector> output(quantiles.size(), - std::vector(interpolations_.size())); - for (uint64_t i = 0; i < interpolations_.size(); ++i) { - const auto interp = interpolations_[i]; + std::vector(interpolations.size())); + for (uint64_t i = 0; i < interpolations.size(); ++i) { + const auto interp = interpolations[i]; for (uint64_t j = 0; j < quantiles.size(); ++j) { output[j][i] = GetQuantile(input, quantiles[j], interp); } @@ -1573,7 +1617,7 @@ class TestRandomQuantileKernel : public TestPrimitiveQuantileKernel { return output; } - Datum GetQuantile(const std::vector& input, double q, + Datum GetQuantile(const std::vector& input, double q, enum QuantileOptions::Interpolation interp) { const double index = (input.size() - 1) * q; const uint64_t lower_index = static_cast(index); @@ -1594,14 +1638,14 @@ class TestRandomQuantileKernel : public TestPrimitiveQuantileKernel { } case QuantileOptions::LINEAR: if (fraction == 0) { - return Datum(static_cast(input[lower_index])); + return Datum(input[lower_index]); } else { return Datum(fraction * input[lower_index + 1] + (1 - fraction) * input[lower_index]); } case QuantileOptions::MIDPOINT: if (fraction == 0) { - return Datum(static_cast(input[lower_index])); + return Datum(input[lower_index]); } else { return Datum(input[lower_index] / 2.0 + input[lower_index + 1] / 2.0); } @@ -1625,7 +1669,30 @@ TEST_F(TestRandomQuantileKernel, Histogram) { // exercise histogram approach: size >= 65536, range <= 65536 this->CheckQuantiles(/*array_size=*/80000, /*num_quantiles=*/100); } + +TEST_F(TestRandomQuantileKernel, TDigest) { + this->CheckTDigests(/*chunk_sizes=*/{12345, 6789, 8765, 4321}, /*num_quantiles=*/100); +} #endif +class TestTDigestKernel : public ::testing::Test {}; + +TEST_F(TestTDigestKernel, AllNullsOrNaNs) { + const std::vector> tests = { + {"[]"}, + {"[null, null]", "[]", "[null]"}, + {"[NaN]", "[NaN, NaN]", "[]"}, + {"[null, NaN, null]"}, + {"[NaN, NaN]", "[]", "[null]"}, + }; + + for (const auto& json : tests) { + auto chunked = ChunkedArrayFromJSON(float64(), json); + ASSERT_OK_AND_ASSIGN(Datum out, TDigest(chunked, TDigestOptions())); + ASSERT_OK(out.make_array()->ValidateFull()); + ASSERT_EQ(out.array()->length, 0); + } +} + } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/registry.cc b/cpp/src/arrow/compute/registry.cc index b1e0d48ccdc..9385c5c2a16 100644 --- a/cpp/src/arrow/compute/registry.cc +++ b/cpp/src/arrow/compute/registry.cc @@ -130,6 +130,7 @@ static std::unique_ptr CreateBuiltInRegistry() { RegisterScalarAggregateBasic(registry.get()); RegisterScalarAggregateMode(registry.get()); RegisterScalarAggregateQuantile(registry.get()); + RegisterScalarAggregateTDigest(registry.get()); RegisterScalarAggregateVariance(registry.get()); // Vector functions diff --git a/cpp/src/arrow/compute/registry_internal.h b/cpp/src/arrow/compute/registry_internal.h index 4e39eeb8204..3b0f4475328 100644 --- a/cpp/src/arrow/compute/registry_internal.h +++ b/cpp/src/arrow/compute/registry_internal.h @@ -45,6 +45,7 @@ void RegisterVectorSort(FunctionRegistry* registry); void RegisterScalarAggregateBasic(FunctionRegistry* registry); void RegisterScalarAggregateMode(FunctionRegistry* registry); void RegisterScalarAggregateQuantile(FunctionRegistry* registry); +void RegisterScalarAggregateTDigest(FunctionRegistry* registry); void RegisterScalarAggregateVariance(FunctionRegistry* registry); } // namespace internal diff --git a/cpp/src/arrow/util/tdigest.cc b/cpp/src/arrow/util/tdigest.cc index 68385a2a578..5550c98c972 100644 --- a/cpp/src/arrow/util/tdigest.cc +++ b/cpp/src/arrow/util/tdigest.cc @@ -332,6 +332,8 @@ class TDigest::TDigestImpl { return Lerp(td[ci_left].mean, td[ci_right].mean, diff); } + double total_weight() const { return total_weight_; } + private: // must be delcared before merger_, see constructor initialization list const uint32_t delta_; @@ -352,6 +354,8 @@ TDigest::TDigest(uint32_t delta, uint32_t buffer_size) : impl_(new TDigestImpl(d } TDigest::~TDigest() = default; +TDigest::TDigest(TDigest&&) = default; +TDigest& TDigest::operator=(TDigest&&) = default; void TDigest::Reset() { input_.resize(0); @@ -368,14 +372,14 @@ void TDigest::Dump() { impl_->Dump(); } -void TDigest::Merge(std::vector>* tdigests) { +void TDigest::Merge(std::vector* tdigests) { MergeInput(); std::vector tdigest_impls; tdigest_impls.reserve(tdigests->size()); for (auto& td : *tdigests) { - td->MergeInput(); - tdigest_impls.push_back(td->impl_.get()); + td.MergeInput(); + tdigest_impls.push_back(td.impl_.get()); } impl_->Merge(tdigest_impls); } @@ -385,6 +389,10 @@ double TDigest::Quantile(double q) { return impl_->Quantile(q); } +bool TDigest::is_empty() const { + return input_.size() == 0 && impl_->total_weight() == 0; +} + void TDigest::MergeInput() { if (input_.size() > 0) { impl_->MergeInput(input_); // will mutate input_ diff --git a/cpp/src/arrow/util/tdigest.h b/cpp/src/arrow/util/tdigest.h index 329926a1536..88ca6bae2c5 100644 --- a/cpp/src/arrow/util/tdigest.h +++ b/cpp/src/arrow/util/tdigest.h @@ -40,6 +40,8 @@ class ARROW_EXPORT TDigest { public: explicit TDigest(uint32_t delta = 100, uint32_t buffer_size = 500); ~TDigest(); + TDigest(TDigest&&); + TDigest& operator=(TDigest&&); // reset and re-use this tdigest void Reset(); @@ -52,6 +54,7 @@ class ARROW_EXPORT TDigest { // buffer a single data point, consume internal buffer if full // this function is intensively called and performance critical + // call it only if you are sure no NAN exists in input data void Add(double value) { DCHECK(!std::isnan(value)) << "cannot add NAN"; if (ARROW_PREDICT_FALSE(input_.size() == input_.capacity())) { @@ -60,12 +63,26 @@ class ARROW_EXPORT TDigest { input_.push_back(value); } + // skip NAN on adding + template + typename std::enable_if::value>::type NanAdd(T value) { + if (!std::isnan(value)) Add(value); + } + + template + typename std::enable_if::value>::type NanAdd(T value) { + Add(static_cast(value)); + } + // merge with other t-digests, called infrequently - void Merge(std::vector>* tdigests); + void Merge(std::vector* tdigests); // calculate quantile double Quantile(double q); + // check if this tdigest contains no valid data points + bool is_empty() const; + private: // merge input data with current tdigest void MergeInput(); diff --git a/cpp/src/arrow/util/tdigest_test.cc b/cpp/src/arrow/util/tdigest_test.cc index f17024eaac5..e9a3924f812 100644 --- a/cpp/src/arrow/util/tdigest_test.cc +++ b/cpp/src/arrow/util/tdigest_test.cc @@ -32,7 +32,6 @@ #include "arrow/testing/gtest_util.h" #include "arrow/testing/random.h" #include "arrow/testing/util.h" -#include "arrow/util/make_unique.h" #include "arrow/util/tdigest.h" namespace arrow { @@ -51,7 +50,7 @@ TEST(TDigestTest, SingleValue) { } TEST(TDigestTest, FewValues) { - // exact quantile at 0.1 intervanl, test sorted and unsorted input + // exact quantile at 0.1 interval, test sorted and unsorted input std::vector> values_vector = { {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, {4, 1, 9, 0, 3, 2, 5, 6, 8, 7, 10}, @@ -152,13 +151,13 @@ void TestMerge(const std::vector>& values_vector, uint32_t d const std::vector quantiles = {0, 0.01, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 0.99, 1}; - std::vector> tds; + std::vector tds; for (const auto& values : values_vector) { - auto td = make_unique(delta); + TDigest td(delta); for (double value : values) { - td->Add(value); + td.Add(value); } - ASSERT_OK(td->Validate()); + ASSERT_OK(td.Validate()); tds.push_back(std::move(td)); } @@ -181,13 +180,13 @@ void TestMerge(const std::vector>& values_vector, uint32_t d // merge into a non empty tdigest { - std::unique_ptr td = std::move(tds[0]); + TDigest td = std::move(tds[0]); tds.erase(tds.begin(), tds.begin() + 1); - td->Merge(&tds); - ASSERT_OK(td->Validate()); + td.Merge(&tds); + ASSERT_OK(td.Validate()); for (size_t i = 0; i < quantiles.size(); ++i) { const double tolerance = std::max(std::fabs(expected[i]) * error_ratio, 0.1); - EXPECT_NEAR(td->Quantile(quantiles[i]), expected[i], tolerance) << quantiles[i]; + EXPECT_NEAR(td.Quantile(quantiles[i]), expected[i], tolerance) << quantiles[i]; } } } diff --git a/docs/source/cpp/compute.rst b/docs/source/cpp/compute.rst index bb96dce7993..7c2eae1e63f 100644 --- a/docs/source/cpp/compute.rst +++ b/docs/source/cpp/compute.rst @@ -207,6 +207,8 @@ Aggregations +--------------------------+------------+--------------------+-----------------------+--------------------------------------------+ | sum | Unary | Numeric | Scalar Numeric (4) | | +--------------------------+------------+--------------------+-----------------------+--------------------------------------------+ +| tdigest | Unary | Numeric | Scalar Float64 | :struct:`TDigestOptions` | ++--------------------------+------------+--------------------+-----------------------+--------------------------------------------+ | variance | Unary | Numeric | Scalar Float64 | :struct:`VarianceOptions` | +--------------------------+------------+--------------------+-----------------------+--------------------------------------------+ diff --git a/python/pyarrow/_compute.pyx b/python/pyarrow/_compute.pyx index 0e5c6779d7c..e5a19288b87 100644 --- a/python/pyarrow/_compute.pyx +++ b/python/pyarrow/_compute.pyx @@ -965,3 +965,22 @@ class QuantileOptions(_QuantileOptions): if not isinstance(q, (list, tuple, np.ndarray)): q = [q] self._set_options(q, interpolation) + + +cdef class _TDigestOptions(FunctionOptions): + cdef: + unique_ptr[CTDigestOptions] tdigest_options + + cdef const CFunctionOptions* get_options(self) except NULL: + return self.tdigest_options.get() + + def _set_options(self, quantiles, delta, buffer_size): + self.tdigest_options.reset( + new CTDigestOptions(quantiles, delta, buffer_size)) + + +class TDigestOptions(_TDigestOptions): + def __init__(self, *, q=0.5, delta=100, buffer_size=500): + if not isinstance(q, (list, tuple, np.ndarray)): + q = [q] + self._set_options(q, delta, buffer_size) diff --git a/python/pyarrow/compute.py b/python/pyarrow/compute.py index e1e64a6b744..616b2de89ec 100644 --- a/python/pyarrow/compute.py +++ b/python/pyarrow/compute.py @@ -43,6 +43,7 @@ SortOptions, StrptimeOptions, TakeOptions, + TDigestOptions, TrimOptions, VarianceOptions, # Functions diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index 6c1c7f671c7..e10ef1e3a5e 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -1890,6 +1890,14 @@ cdef extern from "arrow/compute/api.h" namespace "arrow::compute" nogil: vector[double] q CQuantileInterp interpolation + cdef cppclass CTDigestOptions \ + "arrow::compute::TDigestOptions"(CFunctionOptions): + CTDigestOptions(vector[double] q, + unsigned int delta, unsigned int buffer_size) + vector[double] q + unsigned int delta + unsigned int buffer_size + enum DatumType" arrow::Datum::type": DatumType_NONE" arrow::Datum::NONE" DatumType_SCALAR" arrow::Datum::SCALAR" diff --git a/python/pyarrow/tests/test_compute.py b/python/pyarrow/tests/test_compute.py index 06a0269b54d..673c1387c47 100644 --- a/python/pyarrow/tests/test_compute.py +++ b/python/pyarrow/tests/test_compute.py @@ -1199,3 +1199,21 @@ def test_quantile(): pc.quantile(arr, q=1.1) with pytest.raises(ValueError, match="'zzz' is not a valid interpolation"): pc.quantile(arr, interpolation='zzz') + + +def test_tdigest(): + arr = pa.array([1, 2, 3, 4]) + result = pc.tdigest(arr) + assert result.to_pylist() == [2.5] + + arr = pa.chunked_array([pa.array([1, 2]), pa.array([3, 4])]) + result = pc.tdigest(arr) + assert result.to_pylist() == [2.5] + + arr = pa.array([1, 2, 3, 4]) + result = pc.tdigest(arr, q=[0, 0.5, 1]) + assert result.to_pylist() == [1, 2.5, 4] + + arr = pa.chunked_array([pa.array([1, 2]), pa.array([3, 4])]) + result = pc.tdigest(arr, q=[0, 0.5, 1]) + assert result.to_pylist() == [1, 2.5, 4]