Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/src/arrow/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,7 @@ if(ARROW_COMPUTE)
compute/kernels/aggregate_basic.cc
compute/kernels/aggregate_mode.cc
compute/kernels/aggregate_quantile.cc
compute/kernels/aggregate_tdigest.cc
compute/kernels/aggregate_var_std.cc
compute/kernels/codegen_internal.cc
compute/kernels/scalar_arithmetic.cc
Expand Down
5 changes: 5 additions & 0 deletions cpp/src/arrow/compute/api_aggregate.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,5 +68,10 @@ Result<Datum> Quantile(const Datum& value, const QuantileOptions& options,
return CallFunction("quantile", {value}, &options, ctx);
}

Result<Datum> TDigest(const Datum& value, const TDigestOptions& options,
ExecContext* ctx) {
return CallFunction("tdigest", {value}, &options, ctx);
}

} // namespace compute
} // namespace arrow
36 changes: 36 additions & 0 deletions cpp/src/arrow/compute/api_aggregate.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,28 @@ struct ARROW_EXPORT QuantileOptions : public FunctionOptions {
enum Interpolation interpolation;
};

/// \brief Control TDigest approximate quantile kernel behavior
///
/// By default, returns the median value.
struct ARROW_EXPORT TDigestOptions : public FunctionOptions {
explicit TDigestOptions(double q = 0.5, uint32_t delta = 100,
uint32_t buffer_size = 500)
: q{q}, delta{delta}, buffer_size{buffer_size} {}

explicit TDigestOptions(std::vector<double> q, uint32_t delta = 100,
uint32_t buffer_size = 500)
: q{std::move(q)}, delta{delta}, buffer_size{buffer_size} {}

static TDigestOptions Defaults() { return TDigestOptions{}; }

/// quantile must be between 0 and 1 inclusive
std::vector<double> q;
/// compression parameter, default 100
uint32_t delta;
/// input buffer size, default 500
uint32_t buffer_size;
};

/// @}

/// \brief Count non-null (or null) values in an array.
Expand Down Expand Up @@ -270,5 +292,19 @@ Result<Datum> Quantile(const Datum& value,
const QuantileOptions& options = QuantileOptions::Defaults(),
ExecContext* ctx = NULLPTR);

/// \brief Calculate the approximate quantiles of a numeric array with T-Digest algorithm
///
/// \param[in] value input datum, expecting Array or ChunkedArray
/// \param[in] options see TDigestOptions for more information
/// \param[in] ctx the function execution context, optional
/// \return resulting datum as an array
///
/// \since 4.0.0
/// \note API not yet finalized
ARROW_EXPORT
Result<Datum> TDigest(const Datum& value,
const TDigestOptions& options = TDigestOptions::Defaults(),
ExecContext* ctx = NULLPTR);

} // namespace compute
} // namespace arrow
113 changes: 93 additions & 20 deletions cpp/src/arrow/compute/kernels/aggregate_benchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -461,11 +461,23 @@ VARIANCE_KERNEL_BENCHMARK(VarianceKernelDouble, DoubleType);
// Quantile
//

static std::vector<double> deciles() {
return {0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0};
}

static std::vector<double> centiles() {
std::vector<double> q(101);
for (int i = 0; i <= 100; ++i) {
q[i] = i / 100.0;
}
return q;
}

template <typename ArrowType>
void QuantileKernelBench(benchmark::State& state, int min, int max) {
void QuantileKernel(benchmark::State& state, int min, int max, std::vector<double> q) {
using CType = typename TypeTraits<ArrowType>::CType;

QuantileOptions options;
QuantileOptions options(std::move(q));
RegressionArgs args(state);
const int64_t array_size = args.size / sizeof(CType);
auto rand = random::RandomArrayGenerator(1926);
Expand All @@ -474,29 +486,90 @@ void QuantileKernelBench(benchmark::State& state, int min, int max) {
for (auto _ : state) {
ABORT_NOT_OK(Quantile(array, options).status());
}
state.SetItemsProcessed(state.iterations() * array_size);
}

template <typename ArrowType>
void QuantileKernelMedian(benchmark::State& state, int min, int max) {
QuantileKernel<ArrowType>(state, min, max, {0.5});
}

template <typename ArrowType>
void QuantileKernelMedianWide(benchmark::State& state) {
QuantileKernel<ArrowType>(state, 0, 1 << 24, {0.5});
}

template <typename ArrowType>
void QuantileKernelMedianNarrow(benchmark::State& state) {
QuantileKernel<ArrowType>(state, -30000, 30000, {0.5});
}

template <typename ArrowType>
void QuantileKernelDecilesWide(benchmark::State& state) {
QuantileKernel<ArrowType>(state, 0, 1 << 24, deciles());
}

template <typename ArrowType>
void QuantileKernelDecilesNarrow(benchmark::State& state) {
QuantileKernel<ArrowType>(state, -30000, 30000, deciles());
}

template <typename ArrowType>
void QuantileKernelCentilesWide(benchmark::State& state) {
QuantileKernel<ArrowType>(state, 0, 1 << 24, centiles());
}

template <typename ArrowType>
void QuantileKernelCentilesNarrow(benchmark::State& state) {
QuantileKernel<ArrowType>(state, -30000, 30000, centiles());
}

static void QuantileKernelBenchArgs(benchmark::internal::Benchmark* bench) {
static void QuantileKernelArgs(benchmark::internal::Benchmark* bench) {
BenchmarkSetArgsWithSizes(bench, {1 * 1024 * 1024});
}

#define QUANTILE_KERNEL_BENCHMARK_WIDE(FuncName, Type) \
static void FuncName(benchmark::State& state) { \
QuantileKernelBench<Type>(state, 0, 1 << 24); \
} \
BENCHMARK(FuncName)->Apply(QuantileKernelBenchArgs)

#define QUANTILE_KERNEL_BENCHMARK_NARROW(FuncName, Type) \
static void FuncName(benchmark::State& state) { \
QuantileKernelBench<Type>(state, -30000, 30000); \
} \
BENCHMARK(FuncName)->Apply(QuantileKernelBenchArgs)

QUANTILE_KERNEL_BENCHMARK_WIDE(QuantileKernelInt32Wide, Int32Type);
QUANTILE_KERNEL_BENCHMARK_NARROW(QuantileKernelInt32Narrow, Int32Type);
QUANTILE_KERNEL_BENCHMARK_WIDE(QuantileKernelInt64Wide, Int64Type);
QUANTILE_KERNEL_BENCHMARK_NARROW(QuantileKernelInt64Narrow, Int64Type);
QUANTILE_KERNEL_BENCHMARK_WIDE(QuantileKernelDouble, DoubleType);
BENCHMARK_TEMPLATE(QuantileKernelMedianNarrow, Int32Type)->Apply(QuantileKernelArgs);
BENCHMARK_TEMPLATE(QuantileKernelMedianWide, Int32Type)->Apply(QuantileKernelArgs);
BENCHMARK_TEMPLATE(QuantileKernelMedianNarrow, Int64Type)->Apply(QuantileKernelArgs);
BENCHMARK_TEMPLATE(QuantileKernelMedianWide, Int64Type)->Apply(QuantileKernelArgs);
BENCHMARK_TEMPLATE(QuantileKernelMedianWide, DoubleType)->Apply(QuantileKernelArgs);

BENCHMARK_TEMPLATE(QuantileKernelDecilesNarrow, Int32Type)->Apply(QuantileKernelArgs);
BENCHMARK_TEMPLATE(QuantileKernelDecilesWide, Int32Type)->Apply(QuantileKernelArgs);
BENCHMARK_TEMPLATE(QuantileKernelDecilesWide, DoubleType)->Apply(QuantileKernelArgs);

BENCHMARK_TEMPLATE(QuantileKernelCentilesNarrow, Int32Type)->Apply(QuantileKernelArgs);
BENCHMARK_TEMPLATE(QuantileKernelCentilesWide, Int32Type)->Apply(QuantileKernelArgs);
BENCHMARK_TEMPLATE(QuantileKernelCentilesWide, DoubleType)->Apply(QuantileKernelArgs);

static void TDigestKernelDouble(benchmark::State& state, std::vector<double> q) {
TDigestOptions options{std::move(q)};
RegressionArgs args(state);
const int64_t array_size = args.size / sizeof(double);
auto rand = random::RandomArrayGenerator(1926);
auto array = rand.Numeric<DoubleType>(array_size, 0, 1 << 24, args.null_proportion);

for (auto _ : state) {
ABORT_NOT_OK(TDigest(array, options).status());
}
state.SetItemsProcessed(state.iterations() * array_size);
}

static void TDigestKernelDoubleMedian(benchmark::State& state) {
TDigestKernelDouble(state, {0.5});
}

static void TDigestKernelDoubleDeciles(benchmark::State& state) {
TDigestKernelDouble(state, deciles());
}

static void TDigestKernelDoubleCentiles(benchmark::State& state) {
TDigestKernelDouble(state, centiles());
}

BENCHMARK(TDigestKernelDoubleMedian)->Apply(QuantileKernelArgs);
BENCHMARK(TDigestKernelDoubleDeciles)->Apply(QuantileKernelArgs);
BENCHMARK(TDigestKernelDoubleCentiles)->Apply(QuantileKernelArgs);

} // namespace compute
} // namespace arrow
153 changes: 153 additions & 0 deletions cpp/src/arrow/compute/kernels/aggregate_tdigest.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "arrow/compute/api_aggregate.h"
#include "arrow/compute/kernels/aggregate_internal.h"
#include "arrow/compute/kernels/common.h"
#include "arrow/util/bit_run_reader.h"
#include "arrow/util/tdigest.h"

namespace arrow {
namespace compute {
namespace internal {

namespace {

using arrow::internal::TDigest;
using arrow::internal::VisitSetBitRunsVoid;

template <typename ArrowType>
struct TDigestImpl : public ScalarAggregator {
using ThisType = TDigestImpl<ArrowType>;
using ArrayType = typename TypeTraits<ArrowType>::ArrayType;
using CType = typename ArrowType::c_type;

explicit TDigestImpl(const TDigestOptions& options)
: q{options.q}, tdigest{options.delta, options.buffer_size} {}

void Consume(KernelContext*, const ExecBatch& batch) override {
const ArrayData& data = *batch[0].array();
const CType* values = data.GetValues<CType>(1);

if (data.length > data.GetNullCount()) {
VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length,
[&](int64_t pos, int64_t len) {
for (int64_t i = 0; i < len; ++i) {
this->tdigest.NanAdd(values[pos + i]);
}
});
}
}

void MergeFrom(KernelContext*, KernelState&& src) override {
auto& other = checked_cast<ThisType&>(src);
std::vector<TDigest> other_tdigest;
other_tdigest.push_back(std::move(other.tdigest));
this->tdigest.Merge(&other_tdigest);
}

void Finalize(KernelContext* ctx, Datum* out) override {
const int64_t out_length = this->tdigest.is_empty() ? 0 : this->q.size();
auto out_data = ArrayData::Make(float64(), out_length, 0);
out_data->buffers.resize(2, nullptr);

if (out_length > 0) {
KERNEL_ASSIGN_OR_RAISE(out_data->buffers[1], ctx,
ctx->Allocate(out_length * sizeof(double)));
double* out_buffer = out_data->template GetMutableValues<double>(1);
for (int64_t i = 0; i < out_length; ++i) {
out_buffer[i] = this->tdigest.Quantile(this->q[i]);
}
}

*out = Datum(std::move(out_data));
}

const std::vector<double>& q;
TDigest tdigest;
};

struct TDigestInitState {
std::unique_ptr<KernelState> state;
KernelContext* ctx;
const DataType& in_type;
const TDigestOptions& options;

TDigestInitState(KernelContext* ctx, const DataType& in_type,
const TDigestOptions& options)
: ctx(ctx), in_type(in_type), options(options) {}

Status Visit(const DataType&) {
return Status::NotImplemented("No tdigest implemented");
}

Status Visit(const HalfFloatType&) {
return Status::NotImplemented("No tdigest implemented");
}

template <typename Type>
enable_if_t<is_number_type<Type>::value, Status> Visit(const Type&) {
state.reset(new TDigestImpl<Type>(options));
return Status::OK();
}

std::unique_ptr<KernelState> Create() {
ctx->SetStatus(VisitTypeInline(in_type, this));
return std::move(state);
}
};

std::unique_ptr<KernelState> TDigestInit(KernelContext* ctx, const KernelInitArgs& args) {
TDigestInitState visitor(ctx, *args.inputs[0].type,
static_cast<const TDigestOptions&>(*args.options));
return visitor.Create();
}

void AddTDigestKernels(KernelInit init,
const std::vector<std::shared_ptr<DataType>>& types,
ScalarAggregateFunction* func) {
for (const auto& ty : types) {
auto sig = KernelSignature::Make({InputType::Array(ty)}, float64());
AddAggKernel(std::move(sig), init, func);
}
}

const FunctionDoc tdigest_doc{
"Approximate quantiles of a numeric array with T-Digest algorithm",
("By default, 0.5 quantile (median) is returned.\n"
"Nulls and NaNs are ignored.\n"
"An empty array is returned if there is no valid data point."),
{"array"},
"TDigestOptions"};

std::shared_ptr<ScalarAggregateFunction> AddTDigestAggKernels() {
static auto default_tdigest_options = TDigestOptions::Defaults();
auto func = std::make_shared<ScalarAggregateFunction>(
"tdigest", Arity::Unary(), &tdigest_doc, &default_tdigest_options);
AddTDigestKernels(TDigestInit, NumericTypes(), func.get());
return func;
}

} // namespace

void RegisterScalarAggregateTDigest(FunctionRegistry* registry) {
DCHECK_OK(registry->AddFunction(AddTDigestAggKernels()));
}

} // namespace internal
} // namespace compute
} // namespace arrow
Loading