Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/src/arrow/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,7 @@ if(ARROW_COMPUTE)
compute/registry.cc
compute/kernels/aggregate_basic.cc
compute/kernels/aggregate_mode.cc
compute/kernels/aggregate_var_std.cc
compute/kernels/codegen_internal.cc
compute/kernels/scalar_arithmetic.cc
compute/kernels/scalar_boolean.cc
Expand Down
10 changes: 10 additions & 0 deletions cpp/src/arrow/compute/api_aggregate.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,15 @@ Result<Datum> Mode(const Datum& value, ExecContext* ctx) {
return CallFunction("mode", {value}, ctx);
}

Result<Datum> Stddev(const Datum& value, const VarianceOptions& options,
ExecContext* ctx) {
return CallFunction("stddev", {value}, &options, ctx);
}

Result<Datum> Variance(const Datum& value, const VarianceOptions& options,
ExecContext* ctx) {
return CallFunction("variance", {value}, &options, ctx);
}

} // namespace compute
} // namespace arrow
40 changes: 40 additions & 0 deletions cpp/src/arrow/compute/api_aggregate.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,18 @@ struct ARROW_EXPORT MinMaxOptions : public FunctionOptions {
enum Mode null_handling = SKIP;
};

/// \brief Control Delta Degrees of Freedom (ddof) of Variance and Stddev kernel
///
/// The divisor used in calculations is N - ddof, where N is the number of elements.
/// By default, ddof is zero, and population variance or stddev is returned.
struct ARROW_EXPORT VarianceOptions : public FunctionOptions {
explicit VarianceOptions(int ddof = 0) : ddof(ddof) {}

static VarianceOptions Defaults() { return VarianceOptions{}; }

int ddof = 0;
};

/// @}

/// \brief Count non-null (or null) values in an array.
Expand Down Expand Up @@ -145,5 +157,33 @@ Result<Datum> MinMax(const Datum& value,
ARROW_EXPORT
Result<Datum> Mode(const Datum& value, ExecContext* ctx = NULLPTR);

/// \brief Calculate the standard deviation of a numeric array
///
/// \param[in] value input datum, expecting Array or ChunkedArray
/// \param[in] options see VarianceOptions for more information
/// \param[in] ctx the function execution context, optional
/// \return datum of the computed standard deviation as a DoubleScalar
///
/// \since 2.0.0
/// \note API not yet finalized
ARROW_EXPORT
Result<Datum> Stddev(const Datum& value,
const VarianceOptions& options = VarianceOptions::Defaults(),
ExecContext* ctx = NULLPTR);

/// \brief Calculate the variance of a numeric array
///
/// \param[in] value input datum, expecting Array or ChunkedArray
/// \param[in] options see VarianceOptions for more information
/// \param[in] ctx the function execution context, optional
/// \return datum of the computed variance as a DoubleScalar
///
/// \since 2.0.0
/// \note API not yet finalized
ARROW_EXPORT
Result<Datum> Variance(const Datum& value,
const VarianceOptions& options = VarianceOptions::Defaults(),
ExecContext* ctx = NULLPTR);

} // namespace compute
} // namespace arrow
2 changes: 2 additions & 0 deletions cpp/src/arrow/compute/kernels/aggregate_basic.cc
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,8 @@ void RegisterScalarAggregateBasic(FunctionRegistry* registry) {
DCHECK_OK(registry->AddFunction(std::move(func)));

DCHECK_OK(registry->AddFunction(aggregate::AddModeAggKernels()));
DCHECK_OK(registry->AddFunction(aggregate::AddStddevAggKernels()));
DCHECK_OK(registry->AddFunction(aggregate::AddVarianceAggKernels()));
}

} // namespace internal
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/arrow/compute/kernels/aggregate_basic_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ void AddMeanAvx512AggKernels(ScalarAggregateFunction* func);
void AddMinMaxAvx512AggKernels(ScalarAggregateFunction* func);

std::shared_ptr<ScalarAggregateFunction> AddModeAggKernels();
std::shared_ptr<ScalarAggregateFunction> AddStddevAggKernels();
std::shared_ptr<ScalarAggregateFunction> AddVarianceAggKernels();

// ----------------------------------------------------------------------
// Sum implementation
Expand Down
178 changes: 178 additions & 0 deletions cpp/src/arrow/compute/kernels/aggregate_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -942,5 +942,183 @@ TEST_F(TestInt32ModeKernel, LargeValueRange) {
CheckModeWithRange<ArrowType>(-10000000, 10000000);
}

//
// Variance/Stddev
//

template <typename ArrowType>
class TestPrimitiveVarStdKernel : public ::testing::Test {
public:
using Traits = TypeTraits<ArrowType>;
using ScalarType = typename TypeTraits<DoubleType>::ScalarType;

void AssertVarStdIs(const Array& array, const VarianceOptions& options,
double expected_var, double diff = 0) {
AssertVarStdIsInternal(array, options, expected_var, diff);
}

void AssertVarStdIs(const std::shared_ptr<ChunkedArray>& array,
const VarianceOptions& options, double expected_var,
double diff = 0) {
AssertVarStdIsInternal(array, options, expected_var, diff);
}

void AssertVarStdIs(const std::string& json, const VarianceOptions& options,
double expected_var) {
auto array = ArrayFromJSON(type_singleton(), json);
AssertVarStdIs(*array, options, expected_var);
}

void AssertVarStdIs(const std::vector<std::string>& json,
const VarianceOptions& options, double expected_var) {
auto chunked = ChunkedArrayFromJSON(type_singleton(), json);
AssertVarStdIs(chunked, options, expected_var);
}

void AssertVarStdIsInvalid(const Array& array, const VarianceOptions& options) {
AssertVarStdIsInvalidInternal(array, options);
}

void AssertVarStdIsInvalid(const std::shared_ptr<ChunkedArray>& array,
const VarianceOptions& options) {
AssertVarStdIsInvalidInternal(array, options);
}

void AssertVarStdIsInvalid(const std::string& json, const VarianceOptions& options) {
auto array = ArrayFromJSON(type_singleton(), json);
AssertVarStdIsInvalid(*array, options);
}

void AssertVarStdIsInvalid(const std::vector<std::string>& json,
const VarianceOptions& options) {
auto array = ChunkedArrayFromJSON(type_singleton(), json);
AssertVarStdIsInvalid(array, options);
}

std::shared_ptr<DataType> type_singleton() { return Traits::type_singleton(); }

private:
void AssertVarStdIsInternal(const Datum& array, const VarianceOptions& options,
double expected_var, double diff = 0) {
ASSERT_OK_AND_ASSIGN(Datum out_var, Variance(array, options));
ASSERT_OK_AND_ASSIGN(Datum out_std, Stddev(array, options));
auto var = checked_cast<const ScalarType*>(out_var.scalar().get());
auto std = checked_cast<const ScalarType*>(out_std.scalar().get());
ASSERT_TRUE(var->is_valid && std->is_valid);
ASSERT_DOUBLE_EQ(std->value * std->value, var->value);
if (diff == 0) {
ASSERT_DOUBLE_EQ(var->value, expected_var); // < 4ULP
} else {
ASSERT_NEAR(var->value, expected_var, diff);
}
}

void AssertVarStdIsInvalidInternal(const Datum& array, const VarianceOptions& options) {
ASSERT_OK_AND_ASSIGN(Datum out_var, Variance(array, options));
ASSERT_OK_AND_ASSIGN(Datum out_std, Stddev(array, options));
auto var = checked_cast<const ScalarType*>(out_var.scalar().get());
auto std = checked_cast<const ScalarType*>(out_std.scalar().get());
ASSERT_FALSE(var->is_valid || std->is_valid);
}
};

template <typename ArrowType>
class TestNumericVarStdKernel : public TestPrimitiveVarStdKernel<ArrowType> {};

// Reference value from numpy.var
TYPED_TEST_SUITE(TestNumericVarStdKernel, NumericArrowTypes);
TYPED_TEST(TestNumericVarStdKernel, Basics) {
VarianceOptions options; // ddof = 0, population variance/stddev

this->AssertVarStdIs("[100]", options, 0);
this->AssertVarStdIs("[1, 2, 3]", options, 0.6666666666666666);
this->AssertVarStdIs("[null, 1, 2, null, 3]", options, 0.6666666666666666);

std::vector<std::string> chunks;
chunks = {"[]", "[1]", "[2]", "[null]", "[3]"};
this->AssertVarStdIs(chunks, options, 0.6666666666666666);
chunks = {"[1, 2, 3]", "[4, 5, 6]", "[7, 8]"};
this->AssertVarStdIs(chunks, options, 5.25);
chunks = {"[1, 2, 3, 4, 5, 6, 7]", "[8]"};
this->AssertVarStdIs(chunks, options, 5.25);

this->AssertVarStdIsInvalid("[null, null, null]", options);
this->AssertVarStdIsInvalid("[]", options);
this->AssertVarStdIsInvalid("[]", options);

options.ddof = 1; // sample variance/stddev

this->AssertVarStdIs("[1, 2]", options, 0.5);

chunks = {"[1]", "[2]"};
this->AssertVarStdIs(chunks, options, 0.5);
chunks = {"[1, 2, 3]", "[4, 5, 6]", "[7, 8]"};
this->AssertVarStdIs(chunks, options, 6.0);
chunks = {"[1, 2, 3, 4, 5, 6, 7]", "[8]"};
this->AssertVarStdIs(chunks, options, 6.0);

this->AssertVarStdIsInvalid("[100]", options);
this->AssertVarStdIsInvalid("[100, null, null]", options);
chunks = {"[100]", "[null]", "[]"};
this->AssertVarStdIsInvalid(chunks, options);
}

class TestVarStdKernelStability : public TestPrimitiveVarStdKernel<DoubleType> {};

// Test numerical stability
TEST_F(TestVarStdKernelStability, Basics) {
VarianceOptions options{1}; // ddof = 1
this->AssertVarStdIs("[100000004, 100000007, 100000013, 100000016]", options, 30.0);
this->AssertVarStdIs("[1000000004, 1000000007, 1000000013, 1000000016]", options, 30.0);
}

// Calculate reference variance with Welford's online algorithm
// https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Welford's_online_algorithm
std::pair<double, double> WelfordVar(const Array& array) {
const auto& array_numeric = reinterpret_cast<const DoubleArray&>(array);
const auto values = array_numeric.raw_values();
internal::BitmapReader reader(array.null_bitmap_data(), array.offset(), array.length());
double count = 0, mean = 0, m2 = 0;
for (int64_t i = 0; i < array.length(); ++i) {
if (reader.IsSet()) {
++count;
double delta = values[i] - mean;
mean += delta / count;
double delta2 = values[i] - mean;
m2 += delta * delta2;
}
reader.Next();
}
return std::make_pair(m2 / count, m2 / (count - 1));
}

class TestVarStdKernelRandom : public TestPrimitiveVarStdKernel<DoubleType> {};

TEST_F(TestVarStdKernelRandom, Basics) {
// Cut array into small chunks
constexpr int array_size = 5000;
constexpr int chunk_size_max = 50;
constexpr int chunk_count = array_size / chunk_size_max;

auto rand = random::RandomArrayGenerator(0x5487656);
auto array = rand.Numeric<DoubleType>(array_size, -10000.0, 100000.0, 0.1);
auto chunk_size_array = rand.Numeric<Int32Type>(chunk_count, 0, chunk_size_max);
const int* chunk_size = chunk_size_array->data()->GetValues<int>(1);
int total_size = 0;

ArrayVector array_vector;
for (int i = 0; i < chunk_count; ++i) {
array_vector.emplace_back(array->Slice(total_size, chunk_size[i]));
total_size += chunk_size[i];
}
auto chunked = *ChunkedArray::Make(array_vector);

double var_population, var_sample;
std::tie(var_population, var_sample) = WelfordVar(*(array->Slice(0, total_size)));

this->AssertVarStdIs(chunked, VarianceOptions{0}, var_population, 0.0001);
this->AssertVarStdIs(chunked, VarianceOptions{1}, var_sample, 0.0001);
}

} // namespace compute
} // namespace arrow
Loading