From 9b0327e70c5f9e33b74fdf2365145628d5e3ccf1 Mon Sep 17 00:00:00 2001
From: David Li
Date: Thu, 15 Jul 2021 11:27:02 -0400
Subject: [PATCH 01/10] ARROW-9056: [C++] Support aggregations over scalars
---
.../arrow/compute/kernels/aggregate_basic.cc | 81 +++++--
.../kernels/aggregate_basic_internal.h | 65 +++++-
.../arrow/compute/kernels/aggregate_mode.cc | 22 +-
.../compute/kernels/aggregate_quantile.cc | 37 ++-
.../compute/kernels/aggregate_tdigest.cc | 29 ++-
.../arrow/compute/kernels/aggregate_test.cc | 211 +++++++++++++++---
.../compute/kernels/aggregate_var_std.cc | 37 +++
cpp/src/arrow/compute/kernels/test_util.h | 33 ---
cpp/src/arrow/scalar.cc | 14 ++
9 files changed, 428 insertions(+), 101 deletions(-)
diff --git a/cpp/src/arrow/compute/kernels/aggregate_basic.cc b/cpp/src/arrow/compute/kernels/aggregate_basic.cc
index 5e0454c9c4d..0cd723cbc75 100644
--- a/cpp/src/arrow/compute/kernels/aggregate_basic.cc
+++ b/cpp/src/arrow/compute/kernels/aggregate_basic.cc
@@ -59,10 +59,16 @@ struct CountImpl : public ScalarAggregator {
explicit CountImpl(ScalarAggregateOptions options) : options(std::move(options)) {}
Status Consume(KernelContext*, const ExecBatch& batch) override {
- const ArrayData& input = *batch[0].array();
- const int64_t nulls = input.GetNullCount();
- this->nulls += nulls;
- this->non_nulls += input.length - nulls;
+ if (batch[0].is_array()) {
+ const ArrayData& input = *batch[0].array();
+ const int64_t nulls = input.GetNullCount();
+ this->nulls += nulls;
+ this->non_nulls += input.length - nulls;
+ } else {
+ const Scalar& input = *batch[0].scalar();
+ this->nulls += !input.is_valid;
+ this->non_nulls += input.is_valid;
+ }
return Status::OK();
}
@@ -149,6 +155,12 @@ struct BooleanAnyImpl : public ScalarAggregator {
if (this->any == true) {
return Status::OK();
}
+ if (batch[0].is_scalar()) {
+ const auto& scalar = *batch[0].scalar();
+ this->has_nulls = !scalar.is_valid;
+ this->any = scalar.is_valid && checked_cast(scalar).value;
+ return Status::OK();
+ }
const auto& data = *batch[0].array();
this->has_nulls = data.GetNullCount() > 0;
arrow::internal::OptionalBinaryBitBlockCounter counter(
@@ -208,6 +220,12 @@ struct BooleanAllImpl : public ScalarAggregator {
if (!options.skip_nulls && this->has_nulls) {
return Status::OK();
}
+ if (batch[0].is_scalar()) {
+ const auto& scalar = *batch[0].scalar();
+ this->has_nulls = !scalar.is_valid;
+ this->all = !scalar.is_valid || checked_cast(scalar).value;
+ return Status::OK();
+ }
const auto& data = *batch[0].array();
this->has_nulls = data.GetNullCount() > 0;
arrow::internal::OptionalBinaryBitBlockCounter counter(
@@ -387,6 +405,26 @@ void AddBasicAggKernels(KernelInit init,
}
}
+void AddScalarAggKernels(KernelInit init,
+ const std::vector>& types,
+ std::shared_ptr out_ty,
+ ScalarAggregateFunction* func) {
+ for (const auto& ty : types) {
+ // scalar[InT] -> scalar[OutT]
+ auto sig = KernelSignature::Make({InputType::Scalar(ty)}, ValueDescr::Scalar(out_ty));
+ AddAggKernel(std::move(sig), init, func, SimdLevel::NONE);
+ }
+}
+
+void AddArrayScalarAggKernels(KernelInit init,
+ const std::vector>& types,
+ std::shared_ptr out_ty,
+ ScalarAggregateFunction* func,
+ SimdLevel::type simd_level = SimdLevel::NONE) {
+ AddBasicAggKernels(init, types, out_ty, func, simd_level);
+ AddScalarAggKernels(init, types, out_ty, func);
+}
+
void AddMinMaxKernels(KernelInit init,
const std::vector>& types,
ScalarAggregateFunction* func, SimdLevel::type simd_level) {
@@ -395,6 +433,10 @@ void AddMinMaxKernels(KernelInit init,
auto out_ty = struct_({field("min", ty), field("max", ty)});
auto sig = KernelSignature::Make({InputType::Array(ty)}, ValueDescr::Scalar(out_ty));
AddAggKernel(std::move(sig), init, func, simd_level);
+
+ // scalar[InT] -> scalar[struct]
+ sig = KernelSignature::Make({InputType::Scalar(ty)}, ValueDescr::Scalar(out_ty));
+ AddAggKernel(std::move(sig), init, func, SimdLevel::NONE);
}
}
@@ -468,17 +510,21 @@ void RegisterScalarAggregateBasic(FunctionRegistry* registry) {
InputType any_array(ValueDescr::ARRAY);
AddAggKernel(KernelSignature::Make({any_array}, ValueDescr::Scalar(int64())),
aggregate::CountInit, func.get());
+ AddAggKernel(
+ KernelSignature::Make({InputType(ValueDescr::SCALAR)}, ValueDescr::Scalar(int64())),
+ aggregate::CountInit, func.get());
DCHECK_OK(registry->AddFunction(std::move(func)));
func = std::make_shared("sum", Arity::Unary(), &sum_doc,
&default_scalar_aggregate_options);
- aggregate::AddBasicAggKernels(aggregate::SumInit, {boolean()}, int64(), func.get());
- aggregate::AddBasicAggKernels(aggregate::SumInit, SignedIntTypes(), int64(),
- func.get());
- aggregate::AddBasicAggKernels(aggregate::SumInit, UnsignedIntTypes(), uint64(),
- func.get());
- aggregate::AddBasicAggKernels(aggregate::SumInit, FloatingPointTypes(), float64(),
- func.get());
+ aggregate::AddArrayScalarAggKernels(aggregate::SumInit, {boolean()}, int64(),
+ func.get());
+ aggregate::AddArrayScalarAggKernels(aggregate::SumInit, SignedIntTypes(), int64(),
+ func.get());
+ aggregate::AddArrayScalarAggKernels(aggregate::SumInit, UnsignedIntTypes(), uint64(),
+ func.get());
+ aggregate::AddArrayScalarAggKernels(aggregate::SumInit, FloatingPointTypes(), float64(),
+ func.get());
// Add the SIMD variants for sum
#if defined(ARROW_HAVE_RUNTIME_AVX2) || defined(ARROW_HAVE_RUNTIME_AVX512)
auto cpu_info = arrow::internal::CpuInfo::GetInstance();
@@ -497,9 +543,10 @@ void RegisterScalarAggregateBasic(FunctionRegistry* registry) {
func = std::make_shared("mean", Arity::Unary(), &mean_doc,
&default_scalar_aggregate_options);
- aggregate::AddBasicAggKernels(aggregate::MeanInit, {boolean()}, float64(), func.get());
- aggregate::AddBasicAggKernels(aggregate::MeanInit, NumericTypes(), float64(),
- func.get());
+ aggregate::AddArrayScalarAggKernels(aggregate::MeanInit, {boolean()}, float64(),
+ func.get());
+ aggregate::AddArrayScalarAggKernels(aggregate::MeanInit, NumericTypes(), float64(),
+ func.get());
// Add the SIMD variants for mean
#if defined(ARROW_HAVE_RUNTIME_AVX2)
if (cpu_info->IsSupported(arrow::internal::CpuInfo::AVX2)) {
@@ -534,13 +581,15 @@ void RegisterScalarAggregateBasic(FunctionRegistry* registry) {
// any
func = std::make_shared("any", Arity::Unary(), &any_doc,
&default_scalar_aggregate_options);
- aggregate::AddBasicAggKernels(aggregate::AnyInit, {boolean()}, boolean(), func.get());
+ aggregate::AddArrayScalarAggKernels(aggregate::AnyInit, {boolean()}, boolean(),
+ func.get());
DCHECK_OK(registry->AddFunction(std::move(func)));
// all
func = std::make_shared("all", Arity::Unary(), &all_doc,
&default_scalar_aggregate_options);
- aggregate::AddBasicAggKernels(aggregate::AllInit, {boolean()}, boolean(), func.get());
+ aggregate::AddArrayScalarAggKernels(aggregate::AllInit, {boolean()}, boolean(),
+ func.get());
DCHECK_OK(registry->AddFunction(std::move(func)));
// index
diff --git a/cpp/src/arrow/compute/kernels/aggregate_basic_internal.h b/cpp/src/arrow/compute/kernels/aggregate_basic_internal.h
index e6755c05f5d..98cb41aac6a 100644
--- a/cpp/src/arrow/compute/kernels/aggregate_basic_internal.h
+++ b/cpp/src/arrow/compute/kernels/aggregate_basic_internal.h
@@ -59,13 +59,22 @@ struct SumImpl : public ScalarAggregator {
using OutputType = typename TypeTraits::ScalarType;
Status Consume(KernelContext*, const ExecBatch& batch) override {
- const auto& data = batch[0].array();
- this->count = data->length - data->GetNullCount();
- if (is_boolean_type::value) {
- this->sum += static_cast(BooleanArray(data).true_count());
+ if (batch[0].is_array()) {
+ const auto& data = batch[0].array();
+ this->count = data->length - data->GetNullCount();
+ if (is_boolean_type::value) {
+ this->sum +=
+ static_cast(BooleanArray(data).true_count());
+ } else {
+ this->sum +=
+ arrow::compute::detail::SumArray(*data);
+ }
} else {
- this->sum +=
- arrow::compute::detail::SumArray(*data);
+ const auto& data = *batch[0].scalar();
+ this->count = data.is_valid;
+ if (data.is_valid) {
+ this->sum += internal::UnboxScalar::Unbox(data);
+ }
}
return Status::OK();
}
@@ -228,9 +237,29 @@ struct MinMaxImpl : public ScalarAggregator {
: out_type(out_type), options(options) {}
Status Consume(KernelContext*, const ExecBatch& batch) override {
+ if (batch[0].is_array()) {
+ return ConsumeArray(ArrayType(batch[0].array()));
+ }
+ return ConsumeScalar(*batch[0].scalar());
+ }
+
+ Status ConsumeScalar(const Scalar& scalar) {
StateType local;
+ local.has_nulls = !scalar.is_valid;
+ local.has_values = scalar.is_valid;
- ArrayType arr(batch[0].array());
+ if (local.has_nulls && !options.skip_nulls) {
+ this->state = local;
+ return Status::OK();
+ }
+
+ local.MergeOne(internal::UnboxScalar::Unbox(scalar));
+ this->state = local;
+ return Status::OK();
+ }
+
+ Status ConsumeArray(const ArrayType& arr) {
+ StateType local;
const auto null_count = arr.null_count();
local.has_nulls = null_count > 0;
@@ -344,6 +373,9 @@ struct BooleanMinMaxImpl : public MinMaxImpl {
using MinMaxImpl::options;
Status Consume(KernelContext*, const ExecBatch& batch) override {
+ if (ARROW_PREDICT_FALSE(batch[0].is_scalar())) {
+ return ConsumeScalar(checked_cast(*batch[0].scalar()));
+ }
StateType local;
ArrayType arr(batch[0].array());
@@ -366,6 +398,25 @@ struct BooleanMinMaxImpl : public MinMaxImpl {
this->state = local;
return Status::OK();
}
+
+ Status ConsumeScalar(const BooleanScalar& scalar) {
+ StateType local;
+
+ local.has_nulls = !scalar.is_valid;
+ local.has_values = scalar.is_valid;
+ if (local.has_nulls && !options.skip_nulls) {
+ this->state = local;
+ return Status::OK();
+ }
+
+ const int true_count = scalar.is_valid && scalar.value;
+ const int false_count = scalar.is_valid && !scalar.value;
+ local.max = true_count > 0;
+ local.min = false_count == 0;
+
+ this->state = local;
+ return Status::OK();
+ }
};
template
diff --git a/cpp/src/arrow/compute/kernels/aggregate_mode.cc b/cpp/src/arrow/compute/kernels/aggregate_mode.cc
index 95362335261..58198af6e2d 100644
--- a/cpp/src/arrow/compute/kernels/aggregate_mode.cc
+++ b/cpp/src/arrow/compute/kernels/aggregate_mode.cc
@@ -302,6 +302,22 @@ struct Moder::value>> {
SortModer impl;
};
+template
+Status ScalarMode(KernelContext* ctx, const Scalar& scalar, Datum* out) {
+ using CType = typename T::c_type;
+ if (scalar.is_valid) {
+ bool called = false;
+ return Finalize(ctx, out, [&]() {
+ if (!called) {
+ called = true;
+ return std::pair(UnboxScalar::Unbox(scalar), 1);
+ }
+ return std::pair(0, kCountEOF);
+ });
+ }
+ return Finalize(ctx, out, []() { return std::pair(0, kCountEOF); });
+}
+
template
struct ModeExecutor {
static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
@@ -313,6 +329,10 @@ struct ModeExecutor {
return Status::Invalid("ModeOption::n must be strictly positive");
}
+ if (batch[0].is_scalar()) {
+ return ScalarMode(ctx, *batch[0].scalar(), out);
+ }
+
return Moder().impl.Exec(ctx, batch, out);
}
};
@@ -325,7 +345,7 @@ VectorKernel NewModeKernel(const std::shared_ptr& in_type) {
auto out_type =
struct_({field(kModeFieldName, in_type), field(kCountFieldName, int64())});
kernel.signature =
- KernelSignature::Make({InputType::Array(in_type)}, ValueDescr::Array(out_type));
+ KernelSignature::Make({InputType(in_type)}, ValueDescr::Array(out_type));
return kernel;
}
diff --git a/cpp/src/arrow/compute/kernels/aggregate_quantile.cc b/cpp/src/arrow/compute/kernels/aggregate_quantile.cc
index 0b7821273cc..5164ff80786 100644
--- a/cpp/src/arrow/compute/kernels/aggregate_quantile.cc
+++ b/cpp/src/arrow/compute/kernels/aggregate_quantile.cc
@@ -389,6 +389,36 @@ struct ExactQuantiler::value>> {
SortQuantiler impl;
};
+template
+Status ScalarQuantile(KernelContext* ctx, const QuantileOptions& options,
+ const Scalar& scalar, Datum* out) {
+ using CType = typename T::c_type;
+ ArrayData* output = out->mutable_array();
+ if (!scalar.is_valid) {
+ output->length = 0;
+ output->null_count = 0;
+ return Status::OK();
+ }
+ auto out_type = IsDataPoint(options) ? scalar.type : float64();
+ output->length = options.q.size();
+ output->null_count = 0;
+ ARROW_ASSIGN_OR_RAISE(
+ output->buffers[1],
+ ctx->Allocate(output->length * BitUtil::BytesForBits(GetBitWidth(*out_type))));
+ if (IsDataPoint(options)) {
+ CType* out_buffer = output->template GetMutableValues(1);
+ for (int64_t i = 0; i < output->length; i++) {
+ out_buffer[i] = UnboxScalar::Unbox(scalar);
+ }
+ } else {
+ double* out_buffer = output->template GetMutableValues(1);
+ for (int64_t i = 0; i < output->length; i++) {
+ out_buffer[i] = UnboxScalar::Unbox(scalar);
+ }
+ }
+ return Status::OK();
+}
+
template
struct QuantileExecutor {
static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
@@ -406,6 +436,10 @@ struct QuantileExecutor {
}
}
+ if (batch[0].is_scalar()) {
+ return ScalarQuantile(ctx, options, *batch[0].scalar(), out);
+ }
+
return ExactQuantiler().impl.Exec(ctx, batch, out);
}
};
@@ -427,8 +461,7 @@ void AddQuantileKernels(VectorFunction* func) {
base.output_chunked = false;
for (const auto& ty : NumericTypes()) {
- base.signature =
- KernelSignature::Make({InputType::Array(ty)}, OutputType(ResolveOutput));
+ base.signature = KernelSignature::Make({InputType(ty)}, OutputType(ResolveOutput));
// output type is determined at runtime, set template argument to nulltype
base.exec = GenerateNumeric(*ty);
DCHECK_OK(func->AddKernel(base));
diff --git a/cpp/src/arrow/compute/kernels/aggregate_tdigest.cc b/cpp/src/arrow/compute/kernels/aggregate_tdigest.cc
index fb474a6b8b3..4c261604c85 100644
--- a/cpp/src/arrow/compute/kernels/aggregate_tdigest.cc
+++ b/cpp/src/arrow/compute/kernels/aggregate_tdigest.cc
@@ -40,16 +40,23 @@ struct TDigestImpl : public ScalarAggregator {
: q{options.q}, tdigest{options.delta, options.buffer_size} {}
Status Consume(KernelContext*, const ExecBatch& batch) override {
- const ArrayData& data = *batch[0].array();
- const CType* values = data.GetValues(1);
-
- if (data.length > data.GetNullCount()) {
- VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length,
- [&](int64_t pos, int64_t len) {
- for (int64_t i = 0; i < len; ++i) {
- this->tdigest.NanAdd(values[pos + i]);
- }
- });
+ if (batch[0].is_array()) {
+ const ArrayData& data = *batch[0].array();
+ const CType* values = data.GetValues(1);
+
+ if (data.length > data.GetNullCount()) {
+ VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length,
+ [&](int64_t pos, int64_t len) {
+ for (int64_t i = 0; i < len; ++i) {
+ this->tdigest.NanAdd(values[pos + i]);
+ }
+ });
+ }
+ } else {
+ const CType value = UnboxScalar::Unbox(*batch[0].scalar());
+ if (batch[0].scalar()->is_valid) {
+ this->tdigest.NanAdd(value);
+ }
}
return Status::OK();
}
@@ -125,7 +132,7 @@ void AddTDigestKernels(KernelInit init,
const std::vector>& types,
ScalarAggregateFunction* func) {
for (const auto& ty : types) {
- auto sig = KernelSignature::Make({InputType::Array(ty)}, float64());
+ auto sig = KernelSignature::Make({InputType(ty)}, float64());
AddAggKernel(std::move(sig), init, func);
}
}
diff --git a/cpp/src/arrow/compute/kernels/aggregate_test.cc b/cpp/src/arrow/compute/kernels/aggregate_test.cc
index 7318539df7f..7d3c3d7a908 100644
--- a/cpp/src/arrow/compute/kernels/aggregate_test.cc
+++ b/cpp/src/arrow/compute/kernels/aggregate_test.cc
@@ -41,6 +41,7 @@
#include "arrow/testing/gtest_common.h"
#include "arrow/testing/gtest_util.h"
+#include "arrow/testing/matchers.h"
#include "arrow/testing/random.h"
#include "arrow/util/logging.h"
@@ -103,23 +104,11 @@ static Datum NaiveSum(const Array& array) {
return Datum(std::make_shared(result.first));
}
-template
void ValidateSum(
- const Array& input, Datum expected,
+ const Datum input, Datum expected,
const ScalarAggregateOptions& options = ScalarAggregateOptions::Defaults()) {
- using OutputType = typename FindAccumulatorType::Type;
-
ASSERT_OK_AND_ASSIGN(Datum result, Sum(input, options));
- DatumEqual::EnsureEqual(result, expected);
-}
-
-template
-void ValidateSum(const std::shared_ptr& input, Datum expected,
- const ScalarAggregateOptions& options) {
- using OutputType = typename FindAccumulatorType::Type;
-
- ASSERT_OK_AND_ASSIGN(Datum result, Sum(input, options));
- DatumEqual::EnsureEqual(result, expected);
+ AssertDatumsApproxEqual(expected, result, /*verbose=*/true);
}
template
@@ -127,7 +116,7 @@ void ValidateSum(
const char* json, Datum expected,
const ScalarAggregateOptions& options = ScalarAggregateOptions::Defaults()) {
auto array = ArrayFromJSON(TypeTraits::type_singleton(), json);
- ValidateSum(*array, expected, options);
+ ValidateSum(*array, expected, options);
}
template
@@ -135,13 +124,13 @@ void ValidateSum(
const std::vector& json, Datum expected,
const ScalarAggregateOptions& options = ScalarAggregateOptions::Defaults()) {
auto array = ChunkedArrayFromJSON(TypeTraits::type_singleton(), json);
- ValidateSum(array, expected, options);
+ ValidateSum(array, expected, options);
}
template
void ValidateSum(const Array& array, const ScalarAggregateOptions& options =
ScalarAggregateOptions::Defaults()) {
- ValidateSum(array, NaiveSum(array), options);
+ ValidateSum(array, NaiveSum(array), options);
}
using UnaryOp = Result(const Datum&, const ScalarAggregateOptions&, ExecContext*);
@@ -191,6 +180,13 @@ TEST(TestBooleanAggregation, Sum) {
ScalarAggregateOptions(/*skip_nulls=*/false, /*min_count=*/2));
ValidateBooleanAgg(json, std::make_shared(),
ScalarAggregateOptions(/*skip_nulls=*/false, /*min_count=*/3));
+
+ EXPECT_THAT(Sum(MakeScalar(true)),
+ ResultWith(Datum(std::make_shared(1))));
+ EXPECT_THAT(Sum(MakeScalar(false)),
+ ResultWith(Datum(std::make_shared(0))));
+ EXPECT_THAT(Sum(MakeNullScalar(boolean())),
+ ResultWith(Datum(MakeNullScalar(uint64()))));
}
TEST(TestBooleanAggregation, Mean) {
@@ -227,6 +223,11 @@ TEST(TestBooleanAggregation, Mean) {
ScalarAggregateOptions(/*skip_nulls=*/false, /*min_count=*/2));
ValidateBooleanAgg(json, std::make_shared(),
ScalarAggregateOptions(/*skip_nulls=*/false, /*min_count=*/3));
+
+ EXPECT_THAT(Mean(MakeScalar(true)), ResultWith(Datum(MakeScalar(1.0))));
+ EXPECT_THAT(Mean(MakeScalar(false)), ResultWith(Datum(MakeScalar(0.0))));
+ EXPECT_THAT(Mean(MakeNullScalar(boolean())),
+ ResultWith(Datum(MakeNullScalar(float64()))));
}
template
@@ -236,6 +237,7 @@ TYPED_TEST_SUITE(TestNumericSumKernel, NumericArrowTypes);
TYPED_TEST(TestNumericSumKernel, SimpleSum) {
using SumType = typename FindAccumulatorType::Type;
using ScalarType = typename TypeTraits::ScalarType;
+ using InputScalarType = typename TypeTraits::ScalarType;
using T = typename TypeParam::c_type;
ValidateSum("[]", Datum(std::make_shared()));
@@ -273,12 +275,17 @@ TYPED_TEST(TestNumericSumKernel, SimpleSum) {
const T expected_result = static_cast(14);
ValidateSum("[1, null, 3, null, 3, null, 7]",
Datum(std::make_shared(expected_result)), options);
+
+ EXPECT_THAT(Sum(Datum(std::make_shared(static_cast(5)))),
+ ResultWith(Datum(std::make_shared(static_cast(5)))));
+ EXPECT_THAT(Sum(MakeNullScalar(TypeTraits::type_singleton())),
+ ResultWith(Datum(MakeNullScalar(TypeTraits::type_singleton()))));
}
-TYPED_TEST_SUITE(TestNumericSumKernel, NumericArrowTypes);
TYPED_TEST(TestNumericSumKernel, ScalarAggregateOptions) {
using SumType = typename FindAccumulatorType::Type;
using ScalarType = typename TypeTraits::ScalarType;
+ using InputScalarType = typename TypeTraits::ScalarType;
using T = typename TypeParam::c_type;
const T expected_result = static_cast(14);
@@ -307,6 +314,16 @@ TYPED_TEST(TestNumericSumKernel, ScalarAggregateOptions) {
ScalarAggregateOptions(/*skip_nulls=*/false, /*min_count=*/4));
ValidateSum(json, null_result,
ScalarAggregateOptions(/*skip_nulls=*/false, /*min_count=*/5));
+
+ EXPECT_THAT(Sum(Datum(std::make_shared(static_cast(5))),
+ ScalarAggregateOptions(/*skip_nulls=*/false)),
+ ResultWith(Datum(std::make_shared(static_cast(5)))));
+ EXPECT_THAT(Sum(Datum(std::make_shared(static_cast(5))),
+ ScalarAggregateOptions(/*skip_nulls=*/true, /*min_count=*/2)),
+ ResultWith(Datum(MakeNullScalar(TypeTraits::type_singleton()))));
+ EXPECT_THAT(Sum(MakeNullScalar(TypeTraits::type_singleton()),
+ ScalarAggregateOptions(/*skip_nulls=*/false)),
+ ResultWith(Datum(MakeNullScalar(TypeTraits::type_singleton()))));
}
template
@@ -442,6 +459,14 @@ TYPED_TEST(TestCountKernel, SimpleCount) {
ValidateCount("[1, null, 2]", {2, 1});
ValidateCount("[null, null, null]", {0, 3});
ValidateCount("[1, 2, 3, 4, 5, 6, 7, 8, 9]", {9, 0});
+
+ auto ty = TypeTraits::type_singleton();
+ EXPECT_THAT(Count(MakeNullScalar(ty)), ResultWith(Datum(int64_t(0))));
+ EXPECT_THAT(Count(MakeNullScalar(ty), ScalarAggregateOptions(/*skip_nulls=*/false)),
+ ResultWith(Datum(int64_t(1))));
+ EXPECT_THAT(Count(*MakeScalar(ty, 1)), ResultWith(Datum(int64_t(1))));
+ EXPECT_THAT(Count(*MakeScalar(ty, 1), ScalarAggregateOptions(/*skip_nulls=*/false)),
+ ResultWith(Datum(int64_t(0))));
}
template
@@ -481,15 +506,9 @@ static Datum NaiveMean(const Array& array) {
template
void ValidateMean(const Array& input, Datum expected,
const ScalarAggregateOptions& options) {
- using OutputType = typename FindAccumulatorType::Type;
-
ASSERT_OK_AND_ASSIGN(Datum result, Mean(input, options, nullptr));
- using ScalarType = typename TypeTraits::ScalarType;
- const auto& res = checked_pointer_cast(result.scalar());
- const auto& exp = checked_pointer_cast(expected.scalar());
- if (!(std::isnan(res->value) && std::isnan(exp->value))) {
- DatumEqual::EnsureEqual(result, expected);
- }
+ auto equal_options = EqualOptions::Defaults().nans_equal(true);
+ AssertDatumsApproxEqual(expected, result, /*verbose=*/true, equal_options);
}
template
@@ -512,6 +531,8 @@ class TestMeanKernelNumeric : public ::testing::Test {};
TYPED_TEST_SUITE(TestMeanKernelNumeric, NumericArrowTypes);
TYPED_TEST(TestMeanKernelNumeric, SimpleMean) {
using ScalarType = typename TypeTraits::ScalarType;
+ using InputScalarType = typename TypeTraits::ScalarType;
+ using T = typename TypeParam::c_type;
const ScalarAggregateOptions& options =
ScalarAggregateOptions(/*skip_nulls=*/true, /*min_count=*/0);
@@ -534,12 +555,18 @@ TYPED_TEST(TestMeanKernelNumeric, SimpleMean) {
ValidateMean("[1, 1, 1, 1, 1, 1, 1, 1]",
Datum(std::make_shared(1.0)));
+
+ EXPECT_THAT(Mean(Datum(std::make_shared(static_cast(5)))),
+ ResultWith(Datum(std::make_shared(5.0))));
+ EXPECT_THAT(Mean(MakeNullScalar(TypeTraits::type_singleton())),
+ ResultWith(Datum(MakeNullScalar(float64()))));
}
-TYPED_TEST_SUITE(TestMeanKernelNumeric, NumericArrowTypes);
TYPED_TEST(TestMeanKernelNumeric, ScalarAggregateOptions) {
using ScalarType = typename TypeTraits::ScalarType;
- auto expected_result = Datum(std::make_shared(2));
+ using InputScalarType = typename TypeTraits::ScalarType;
+ using T = typename TypeParam::c_type;
+ auto expected_result = Datum(std::make_shared(3));
auto null_result = Datum(std::make_shared());
auto nan_result = Datum(std::make_shared(NAN));
const char* json = "[1, null, 2, 2, null, 7]";
@@ -577,6 +604,16 @@ TYPED_TEST(TestMeanKernelNumeric, ScalarAggregateOptions) {
ScalarAggregateOptions(/*skip_nulls=*/false, /*min_count=*/4));
ValidateMean(json, null_result,
ScalarAggregateOptions(/*skip_nulls=*/false, /*min_count=*/15));
+
+ EXPECT_THAT(Mean(Datum(std::make_shared(static_cast(5))),
+ ScalarAggregateOptions(/*skip_nulls=*/false)),
+ ResultWith(Datum(std::make_shared(5.0))));
+ EXPECT_THAT(Mean(Datum(std::make_shared(static_cast(5))),
+ ScalarAggregateOptions(/*skip_nulls=*/true, /*min_count=*/2)),
+ ResultWith(Datum(MakeNullScalar(float64()))));
+ EXPECT_THAT(Mean(MakeNullScalar(TypeTraits::type_singleton()),
+ ScalarAggregateOptions(/*skip_nulls=*/false)),
+ ResultWith(Datum(MakeNullScalar(float64()))));
}
template
@@ -696,6 +733,7 @@ TEST_F(TestBooleanMinMaxKernel, Basics) {
std::vector chunked_input1 = {"[true, true, null]", "[true, null]"};
std::vector chunked_input2 = {"[false, false, false]", "[false]"};
std::vector chunked_input3 = {"[true, null]", "[null, false]"};
+ auto ty = struct_({field("min", boolean()), field("max", boolean())});
// SKIP nulls by default
options = ScalarAggregateOptions(/*skip_nulls=*/true, /*min_count=*/1);
@@ -711,6 +749,13 @@ TEST_F(TestBooleanMinMaxKernel, Basics) {
this->AssertMinMaxIs(chunked_input2, false, false, options);
this->AssertMinMaxIs(chunked_input3, false, true, options);
+ Datum null_min_max = ScalarFromJSON(ty, "[null, null]");
+ Datum true_min_max = ScalarFromJSON(ty, "[true, true]");
+ Datum false_min_max = ScalarFromJSON(ty, "[false, false]");
+ EXPECT_THAT(MinMax(MakeNullScalar(boolean())), ResultWith(null_min_max));
+ EXPECT_THAT(MinMax(MakeScalar(true)), ResultWith(true_min_max));
+ EXPECT_THAT(MinMax(MakeScalar(false)), ResultWith(false_min_max));
+
options = ScalarAggregateOptions(/*skip_nulls=*/false, /*min_count=*/1);
this->AssertMinMaxIsNull("[]", options);
this->AssertMinMaxIsNull("[null, null, null]", options);
@@ -724,6 +769,10 @@ TEST_F(TestBooleanMinMaxKernel, Basics) {
this->AssertMinMaxIs(chunked_input2, false, false, options);
this->AssertMinMaxIsNull(chunked_input3, options);
+ options = ScalarAggregateOptions(/*skip_nulls=*/false, /*min_count=*/2);
+ EXPECT_THAT(MinMax(MakeNullScalar(boolean()), options), ResultWith(null_min_max));
+ EXPECT_THAT(MinMax(MakeScalar(true), options), ResultWith(true_min_max));
+
options = ScalarAggregateOptions(/*skip_nulls=*/true, /*min_count=*/0);
this->AssertMinMaxIsNull("[]", options);
this->AssertMinMaxIsNull("[null]", options);
@@ -735,6 +784,8 @@ TYPED_TEST(TestIntegerMinMaxKernel, Basics) {
std::vector chunked_input1 = {"[5, 1, 2, 3, 4]", "[9, 1, null, 3, 4]"};
std::vector chunked_input2 = {"[5, null, 2, 3, 4]", "[9, 1, 2, 3, 4]"};
std::vector chunked_input3 = {"[5, 1, 2, 3, null]", "[9, 1, null, 3, 4]"};
+ auto item_ty = TypeTraits::type_singleton();
+ auto ty = struct_({field("min", item_ty), field("max", item_ty)});
// SKIP nulls by default
this->AssertMinMaxIsNull("[]", options);
@@ -745,6 +796,14 @@ TYPED_TEST(TestIntegerMinMaxKernel, Basics) {
this->AssertMinMaxIs(chunked_input2, 1, 9, options);
this->AssertMinMaxIs(chunked_input3, 1, 9, options);
+ Datum null_min_max(std::make_shared(
+ ScalarVector{MakeNullScalar(item_ty), MakeNullScalar(item_ty)}, ty));
+ auto one_scalar = *MakeScalar(item_ty, static_cast(1));
+ Datum one_min_max(
+ std::make_shared(ScalarVector{one_scalar, one_scalar}, ty));
+ EXPECT_THAT(MinMax(MakeNullScalar(item_ty)), ResultWith(null_min_max));
+ EXPECT_THAT(MinMax(one_scalar), ResultWith(one_min_max));
+
options = ScalarAggregateOptions(/*skip_nulls=*/false);
this->AssertMinMaxIs("[5, 1, 2, 3, 4]", 1, 5, options);
// output null
@@ -761,6 +820,8 @@ TYPED_TEST(TestFloatingMinMaxKernel, Floats) {
std::vector chunked_input1 = {"[5, 1, 2, 3, 4]", "[9, 1, null, 3, 4]"};
std::vector chunked_input2 = {"[5, null, 2, 3, 4]", "[9, 1, 2, 3, 4]"};
std::vector chunked_input3 = {"[5, 1, 2, 3, null]", "[9, 1, null, 3, 4]"};
+ auto item_ty = TypeTraits::type_singleton();
+ auto ty = struct_({field("min", item_ty), field("max", item_ty)});
this->AssertMinMaxIs("[5, 1, 2, 3, 4]", 1, 5, options);
this->AssertMinMaxIs("[5, 1, 2, 3, 4]", 1, 5, options);
@@ -772,6 +833,14 @@ TYPED_TEST(TestFloatingMinMaxKernel, Floats) {
this->AssertMinMaxIs(chunked_input2, 1, 9, options);
this->AssertMinMaxIs(chunked_input3, 1, 9, options);
+ Datum null_min_max(std::make_shared(
+ ScalarVector{MakeNullScalar(item_ty), MakeNullScalar(item_ty)}, ty));
+ auto one_scalar = *MakeScalar(item_ty, static_cast(1));
+ Datum one_min_max(
+ std::make_shared(ScalarVector{one_scalar, one_scalar}, ty));
+ EXPECT_THAT(MinMax(MakeNullScalar(item_ty)), ResultWith(null_min_max));
+ EXPECT_THAT(MinMax(one_scalar), ResultWith(one_min_max));
+
options = ScalarAggregateOptions(/*skip_nulls=*/false);
this->AssertMinMaxIs("[5, 1, 2, 3, 4]", 1, 5, options);
this->AssertMinMaxIs("[5, -Inf, 2, 3, 4]", -INFINITY, 5, options);
@@ -998,6 +1067,10 @@ TEST_F(TestAnyKernel, Basics) {
this->AssertAnyIs(chunked_input3, false_value);
this->AssertAnyIs(chunked_input4, true_value);
+ EXPECT_THAT(Any(Datum(true)), ResultWith(Datum(true)));
+ EXPECT_THAT(Any(Datum(false)), ResultWith(Datum(false)));
+ EXPECT_THAT(Any(MakeNullScalar(boolean())), ResultWith(Datum(false)));
+
const ScalarAggregateOptions& keep_nulls = ScalarAggregateOptions(/*skip_nulls=*/false);
this->AssertAnyIs("[]", false_value, keep_nulls);
this->AssertAnyIs("[false]", false_value, keep_nulls);
@@ -1013,6 +1086,11 @@ TEST_F(TestAnyKernel, Basics) {
this->AssertAnyIs(chunked_input2, false_value, keep_nulls);
this->AssertAnyIs(chunked_input3, null_value, keep_nulls);
this->AssertAnyIs(chunked_input4, true_value, keep_nulls);
+
+ EXPECT_THAT(Any(Datum(true), keep_nulls), ResultWith(Datum(true)));
+ EXPECT_THAT(Any(Datum(false), keep_nulls), ResultWith(Datum(false)));
+ EXPECT_THAT(Any(MakeNullScalar(boolean()), keep_nulls),
+ ResultWith(Datum(MakeNullScalar(boolean()))));
}
//
@@ -1079,6 +1157,10 @@ TEST_F(TestAllKernel, Basics) {
this->AssertAllIs(chunked_input4, false_value);
this->AssertAllIs(chunked_input5, false_value);
+ EXPECT_THAT(All(Datum(true)), ResultWith(Datum(true)));
+ EXPECT_THAT(All(Datum(false)), ResultWith(Datum(false)));
+ EXPECT_THAT(All(MakeNullScalar(boolean())), ResultWith(Datum(true)));
+
const ScalarAggregateOptions keep_nulls = ScalarAggregateOptions(/*skip_nulls=*/false);
this->AssertAllIs("[]", true_value, keep_nulls);
this->AssertAllIs("[false]", false_value, keep_nulls);
@@ -1095,6 +1177,11 @@ TEST_F(TestAllKernel, Basics) {
this->AssertAllIs(chunked_input3, false_value, keep_nulls);
this->AssertAllIs(chunked_input4, false_value, keep_nulls);
this->AssertAllIs(chunked_input5, false_value, keep_nulls);
+
+ EXPECT_THAT(All(Datum(true), keep_nulls), ResultWith(Datum(true)));
+ EXPECT_THAT(All(Datum(false), keep_nulls), ResultWith(Datum(false)));
+ EXPECT_THAT(All(MakeNullScalar(boolean()), keep_nulls),
+ ResultWith(Datum(MakeNullScalar(boolean()))));
}
//
@@ -1358,6 +1445,14 @@ TEST_F(TestBooleanModeKernel, Basics) {
this->AssertModesAre("[true, null, false, false, null, true, null, null, true]", 100,
{true, false}, {3, 2});
this->AssertModesEmpty({"[null, null]", "[]", "[null]"}, 4);
+
+ auto ty = struct_({field("mode", boolean()), field("count", int64())});
+ Datum mode_true = ArrayFromJSON(ty, "[[true, 1]]");
+ Datum mode_false = ArrayFromJSON(ty, "[[false, 1]]");
+ Datum mode_empty = ArrayFromJSON(ty, "[]");
+ EXPECT_THAT(Mode(Datum(true)), ResultWith(mode_true));
+ EXPECT_THAT(Mode(Datum(false)), ResultWith(mode_false));
+ EXPECT_THAT(Mode(MakeNullScalar(boolean())), ResultWith(mode_empty));
}
TYPED_TEST_SUITE(TestIntegerModeKernel, IntegralArrowTypes);
@@ -1377,6 +1472,12 @@ TYPED_TEST(TestIntegerModeKernel, Basics) {
this->AssertModesAre("[127, 0, 127, 127, 0, 1, 0, 127]", 2, {127, 0}, {4, 3});
this->AssertModesAre("[null, null, 2, null, 1]", 3, {1, 2}, {1, 1});
this->AssertModesEmpty("[null, null, null]", 10);
+
+ auto in_ty = this->type_singleton();
+ auto ty = struct_({field("mode", in_ty), field("count", int64())});
+ EXPECT_THAT(Mode(*MakeScalar(in_ty, 5)),
+ ResultWith(Datum(ArrayFromJSON(ty, "[[5, 1]]"))));
+ EXPECT_THAT(Mode(MakeNullScalar(in_ty)), ResultWith(Datum(ArrayFromJSON(ty, "[]"))));
}
TYPED_TEST_SUITE(TestFloatingModeKernel, RealArrowTypes);
@@ -1402,6 +1503,12 @@ TYPED_TEST(TestFloatingModeKernel, Floats) {
this->AssertModesAre("[Inf, 100, Inf, 100, Inf]", 2, {INFINITY, 100}, {3, 2});
this->AssertModesAre("[NaN, NaN, 1, null, 1, 2, 2]", 3, {1, 2, NAN}, {2, 2, 2});
+
+ auto in_ty = this->type_singleton();
+ auto ty = struct_({field("mode", in_ty), field("count", int64())});
+ EXPECT_THAT(Mode(*MakeScalar(in_ty, 5.0)),
+ ResultWith(Datum(ArrayFromJSON(ty, "[[5.0, 1]]"))));
+ EXPECT_THAT(Mode(MakeNullScalar(in_ty)), ResultWith(Datum(ArrayFromJSON(ty, "[]"))));
}
TEST_F(TestInt8ModeKernelValueRange, Basics) {
@@ -1616,6 +1723,16 @@ TYPED_TEST(TestNumericVarStdKernel, Basics) {
this->AssertVarStdIsInvalid("[100, null, null]", options);
chunks = {"[100]", "[null]", "[]"};
this->AssertVarStdIsInvalid(chunks, options);
+
+ auto ty = this->type_singleton();
+ EXPECT_THAT(Stddev(*MakeScalar(ty, 5)), ResultWith(Datum(0.0)));
+ EXPECT_THAT(Variance(*MakeScalar(ty, 5)), ResultWith(Datum(0.0)));
+ EXPECT_THAT(Stddev(*MakeScalar(ty, 5), options),
+ ResultWith(Datum(MakeNullScalar(float64()))));
+ EXPECT_THAT(Variance(*MakeScalar(ty, 5), options),
+ ResultWith(Datum(MakeNullScalar(float64()))));
+ EXPECT_THAT(Stddev(MakeNullScalar(ty)), ResultWith(Datum(MakeNullScalar(float64()))));
+ EXPECT_THAT(Variance(MakeNullScalar(ty)), ResultWith(Datum(MakeNullScalar(float64()))));
}
// Test numerical stability
@@ -1934,6 +2051,19 @@ TYPED_TEST(TestIntegerQuantileKernel, Basics) {
this->AssertQuantilesEmpty("[]", {0.5});
this->AssertQuantilesEmpty("[null, null, null]", {0.1, 0.2});
this->AssertQuantilesEmpty({"[null, null]", "[]", "[null]"}, {0.3, 0.4});
+
+ auto ty = this->type_singleton();
+ for (const auto interpolation : this->interpolations_) {
+ QuantileOptions options({0.0, 0.5, 1.0}, interpolation);
+ auto expected_ty = (interpolation == QuantileOptions::LINEAR ||
+ interpolation == QuantileOptions::MIDPOINT)
+ ? float64()
+ : ty;
+ EXPECT_THAT(Quantile(*MakeScalar(ty, 1), options),
+ ResultWith(ArrayFromJSON(expected_ty, "[1, 1, 1]")));
+ EXPECT_THAT(Quantile(MakeNullScalar(ty), options),
+ ResultWith(ArrayFromJSON(expected_ty, "[]")));
+ }
}
template
@@ -1967,6 +2097,19 @@ TYPED_TEST(TestFloatingQuantileKernel, Floats) {
this->AssertQuantilesEmpty("[]", {0.5, 0.6});
this->AssertQuantilesEmpty("[null, NaN, null]", {0.1});
this->AssertQuantilesEmpty({"[NaN, NaN]", "[]", "[null]"}, {0.3, 0.4});
+
+ auto ty = this->type_singleton();
+ for (const auto interpolation : this->interpolations_) {
+ QuantileOptions options({0.0, 0.5, 1.0}, interpolation);
+ auto expected_ty = (interpolation == QuantileOptions::LINEAR ||
+ interpolation == QuantileOptions::MIDPOINT)
+ ? float64()
+ : ty;
+ EXPECT_THAT(Quantile(*MakeScalar(ty, 1), options),
+ ResultWith(ArrayFromJSON(expected_ty, "[1, 1, 1]")));
+ EXPECT_THAT(Quantile(MakeNullScalar(ty), options),
+ ResultWith(ArrayFromJSON(expected_ty, "[]")));
+ }
}
class TestInt8QuantileKernel : public TestPrimitiveQuantileKernel {};
@@ -2227,9 +2370,7 @@ TEST_F(TestRandomFloatQuantileKernel, Sliced) {
}
#endif
-class TestTDigestKernel : public ::testing::Test {};
-
-TEST_F(TestTDigestKernel, AllNullsOrNaNs) {
+TEST(TestTDigestKernel, AllNullsOrNaNs) {
const std::vector> tests = {
{"[]"},
{"[null, null]", "[]", "[null]"},
@@ -2247,5 +2388,13 @@ TEST_F(TestTDigestKernel, AllNullsOrNaNs) {
}
}
+TEST(TestTDigestKernel, Scalar) {
+ for (const auto& ty : {float64(), int64(), uint64()}) {
+ TDigestOptions options(std::vector{0.0, 0.5, 1.0});
+ EXPECT_THAT(TDigest(*MakeScalar(ty, 1), options),
+ ResultWith(ArrayFromJSON(float64(), "[1, 1, 1]")));
+ }
+}
+
} // namespace compute
} // namespace arrow
diff --git a/cpp/src/arrow/compute/kernels/aggregate_var_std.cc b/cpp/src/arrow/compute/kernels/aggregate_var_std.cc
index 29b2adce3bd..74d7b390c4f 100644
--- a/cpp/src/arrow/compute/kernels/aggregate_var_std.cc
+++ b/cpp/src/arrow/compute/kernels/aggregate_var_std.cc
@@ -180,6 +180,34 @@ struct VarStdImpl : public ScalarAggregator {
VarOrStd return_type;
};
+struct ScalarVarStdImpl : public ScalarAggregator {
+ explicit ScalarVarStdImpl(const VarianceOptions& options)
+ : options(options), seen(false) {}
+
+ Status Consume(KernelContext*, const ExecBatch& batch) override {
+ seen = batch[0].scalar()->is_valid;
+ return Status::OK();
+ }
+
+ Status MergeFrom(KernelContext*, KernelState&& src) override {
+ const auto& other = checked_cast(src);
+ seen = seen || other.seen;
+ return Status::OK();
+ }
+
+ Status Finalize(KernelContext*, Datum* out) override {
+ if (!seen || options.ddof > 0) {
+ out->value = std::make_shared();
+ } else {
+ out->value = std::make_shared(0.0);
+ }
+ return Status::OK();
+ }
+
+ const VarianceOptions options;
+ bool seen;
+};
+
struct VarStdInitState {
std::unique_ptr state;
KernelContext* ctx;
@@ -233,12 +261,21 @@ Result> VarianceInit(KernelContext* ctx,
return visitor.Create();
}
+Result> ScalarVarStdInit(KernelContext* ctx,
+ const KernelInitArgs& args) {
+ return arrow::internal::make_unique(
+ static_cast(*args.options));
+}
+
void AddVarStdKernels(KernelInit init,
const std::vector>& types,
ScalarAggregateFunction* func) {
for (const auto& ty : types) {
auto sig = KernelSignature::Make({InputType::Array(ty)}, float64());
AddAggKernel(std::move(sig), init, func);
+
+ sig = KernelSignature::Make({InputType::Scalar(ty)}, float64());
+ AddAggKernel(std::move(sig), ScalarVarStdInit, func);
}
}
diff --git a/cpp/src/arrow/compute/kernels/test_util.h b/cpp/src/arrow/compute/kernels/test_util.h
index a3fb9308f58..b10ede6f8f5 100644
--- a/cpp/src/arrow/compute/kernels/test_util.h
+++ b/cpp/src/arrow/compute/kernels/test_util.h
@@ -58,39 +58,6 @@ std::shared_ptr _MakeArray(const std::shared_ptr& type,
return result;
}
-template
-struct DatumEqual {};
-
-template
-struct DatumEqual> {
- static constexpr double kArbitraryDoubleErrorBound = 1.0;
- using ScalarType = typename TypeTraits::ScalarType;
-
- static void EnsureEqual(const Datum& lhs, const Datum& rhs) {
- ASSERT_EQ(lhs.kind(), rhs.kind());
- if (lhs.kind() == Datum::SCALAR) {
- auto left = checked_cast(lhs.scalar().get());
- auto right = checked_cast(rhs.scalar().get());
- ASSERT_EQ(left->is_valid, right->is_valid);
- ASSERT_EQ(left->type->id(), right->type->id());
- ASSERT_NEAR(left->value, right->value, kArbitraryDoubleErrorBound);
- }
- }
-};
-
-template
-struct DatumEqual> {
- using ScalarType = typename TypeTraits::ScalarType;
- static void EnsureEqual(const Datum& lhs, const Datum& rhs) {
- ASSERT_EQ(lhs.kind(), rhs.kind());
- if (lhs.kind() == Datum::SCALAR) {
- auto left = checked_cast(lhs.scalar().get());
- auto right = checked_cast(rhs.scalar().get());
- ASSERT_EQ(*left, *right);
- }
- }
-};
-
void CheckScalar(std::string func_name, const ScalarVector& inputs,
std::shared_ptr expected,
const FunctionOptions* options = nullptr);
diff --git a/cpp/src/arrow/scalar.cc b/cpp/src/arrow/scalar.cc
index 56a36114e49..c38007a1986 100644
--- a/cpp/src/arrow/scalar.cc
+++ b/cpp/src/arrow/scalar.cc
@@ -18,6 +18,7 @@
#include "arrow/scalar.h"
#include
+#include
#include
#include
@@ -562,6 +563,19 @@ Status CastImpl(const Decimal256Scalar& from, StringScalar* to) {
return Status::OK();
}
+Status CastImpl(const StructScalar& from, StringScalar* to) {
+ std::stringstream ss;
+ ss << '{';
+ for (size_t i = 0; i < from.value.size(); i++) {
+ if (i > 0) ss << ", ";
+ ss << from.type->field(i)->name() << ':' << from.type->field(i)->type()->ToString()
+ << " = " << from.value[i]->ToString();
+ }
+ ss << '}';
+ to->value = Buffer::FromString(ss.str());
+ return Status::OK();
+}
+
struct CastImplVisitor {
Status NotImplemented() {
return Status::NotImplemented("cast to ", *to_type_, " from ", *from_.type);
From 3f53328ecff64cc8132f8375c2d50bffb2d10514 Mon Sep 17 00:00:00 2001
From: David Li
Date: Thu, 15 Jul 2021 13:33:42 -0400
Subject: [PATCH 02/10] ARROW-9056: [C++] Fix Ruby test
---
c_glib/test/test-struct-scalar.rb | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/c_glib/test/test-struct-scalar.rb b/c_glib/test/test-struct-scalar.rb
index 917b0f4cc18..9774943ba09 100644
--- a/c_glib/test/test-struct-scalar.rb
+++ b/c_glib/test/test-struct-scalar.rb
@@ -46,7 +46,7 @@ def test_equal
end
def test_to_s
- assert_equal("...", @scalar.to_s)
+ assert_equal("{score:int8 = -29, enabled:bool = true}", @scalar.to_s)
end
def test_value
From eff3f3e40aedbc6b8d5d482e5443a6bca4cc0860 Mon Sep 17 00:00:00 2001
From: David Li
Date: Thu, 15 Jul 2021 13:35:32 -0400
Subject: [PATCH 03/10] ARROW-9056: [C++] Make MSVC happier
---
cpp/src/arrow/compute/kernels/aggregate_mode.cc | 2 +-
cpp/src/arrow/compute/kernels/aggregate_quantile.cc | 2 +-
2 files changed, 2 insertions(+), 2 deletions(-)
diff --git a/cpp/src/arrow/compute/kernels/aggregate_mode.cc b/cpp/src/arrow/compute/kernels/aggregate_mode.cc
index 58198af6e2d..4c0202e396d 100644
--- a/cpp/src/arrow/compute/kernels/aggregate_mode.cc
+++ b/cpp/src/arrow/compute/kernels/aggregate_mode.cc
@@ -312,7 +312,7 @@ Status ScalarMode(KernelContext* ctx, const Scalar& scalar, Datum* out) {
called = true;
return std::pair(UnboxScalar::Unbox(scalar), 1);
}
- return std::pair(0, kCountEOF);
+ return std::pair(static_cast(0), kCountEOF);
});
}
return Finalize(ctx, out, []() { return std::pair(0, kCountEOF); });
diff --git a/cpp/src/arrow/compute/kernels/aggregate_quantile.cc b/cpp/src/arrow/compute/kernels/aggregate_quantile.cc
index 5164ff80786..7d2ffe0770c 100644
--- a/cpp/src/arrow/compute/kernels/aggregate_quantile.cc
+++ b/cpp/src/arrow/compute/kernels/aggregate_quantile.cc
@@ -413,7 +413,7 @@ Status ScalarQuantile(KernelContext* ctx, const QuantileOptions& options,
} else {
double* out_buffer = output->template GetMutableValues(1);
for (int64_t i = 0; i < output->length; i++) {
- out_buffer[i] = UnboxScalar::Unbox(scalar);
+ out_buffer[i] = static_cast(UnboxScalar::Unbox(scalar));
}
}
return Status::OK();
From d911fc0f23775c9b0d31d5eba4b8a362ec8bcf6d Mon Sep 17 00:00:00 2001
From: David Li
Date: Thu, 15 Jul 2021 13:52:30 -0400
Subject: [PATCH 04/10] ARROW-9056: [C++] Make MSVC happier 2
---
cpp/src/arrow/compute/kernels/aggregate_mode.cc | 4 +++-
1 file changed, 3 insertions(+), 1 deletion(-)
diff --git a/cpp/src/arrow/compute/kernels/aggregate_mode.cc b/cpp/src/arrow/compute/kernels/aggregate_mode.cc
index 4c0202e396d..6ad0eeb6456 100644
--- a/cpp/src/arrow/compute/kernels/aggregate_mode.cc
+++ b/cpp/src/arrow/compute/kernels/aggregate_mode.cc
@@ -315,7 +315,9 @@ Status ScalarMode(KernelContext* ctx, const Scalar& scalar, Datum* out) {
return std::pair(static_cast(0), kCountEOF);
});
}
- return Finalize(ctx, out, []() { return std::pair(0, kCountEOF); });
+ return Finalize(ctx, out, []() {
+ return std::pair(static_cast(0), kCountEOF);
+ });
}
template
From f8231d904c22f3e62d6310b83828d6fe51c6dc42 Mon Sep 17 00:00:00 2001
From: David Li
Date: Thu, 15 Jul 2021 14:33:55 -0400
Subject: [PATCH 05/10] ARROW-9056: [C++] Unskip R tests
---
r/R/compute.R | 3 ++-
r/tests/testthat/test-compute-aggregate.R | 4 +---
2 files changed, 3 insertions(+), 4 deletions(-)
diff --git a/r/R/compute.R b/r/R/compute.R
index 4277ad8d6df..2544471aaf6 100644
--- a/r/R/compute.R
+++ b/r/R/compute.R
@@ -138,11 +138,12 @@ collect_arrays_from_dots <- function(dots) {
# Given a list that may contain both Arrays and ChunkedArrays,
# return a single ChunkedArray containing all of those chunks
# (may return a regular Array if there is only one element in dots)
- assert_that(all(map_lgl(dots, is.Array)))
+ # If there is only one element and it is a scalar, it returns the scalar
if (length(dots) == 1) {
return(dots[[1]])
}
+ assert_that(all(map_lgl(dots, is.Array)))
arrays <- unlist(lapply(dots, function(x) {
if (inherits(x, "ChunkedArray")) {
x$chunks
diff --git a/r/tests/testthat/test-compute-aggregate.R b/r/tests/testthat/test-compute-aggregate.R
index 428f799c97b..4dd929df0bf 100644
--- a/r/tests/testthat/test-compute-aggregate.R
+++ b/r/tests/testthat/test-compute-aggregate.R
@@ -65,7 +65,6 @@ test_that("sum dots", {
})
test_that("sum.Scalar", {
- skip("No sum method in arrow for Scalar: ARROW-9056")
s <- Scalar$create(4)
expect_identical(as.numeric(s), as.numeric(sum(s)))
})
@@ -104,9 +103,8 @@ test_that("mean.ChunkedArray", {
})
test_that("mean.Scalar", {
- skip("No mean method in arrow for Scalar: ARROW-9056")
s <- Scalar$create(4)
- expect_identical(as.vector(s), mean(s))
+ expect_equal(s, mean(s))
})
test_that("Bad input handling of call_function", {
From 1152442747393f65402a4e88ccca44dff60d0558 Mon Sep 17 00:00:00 2001
From: David Li
Date: Thu, 15 Jul 2021 14:37:07 -0400
Subject: [PATCH 06/10] ARROW-9056: [C++] Simplify MinMax registration
---
cpp/src/arrow/compute/kernels/aggregate_basic.cc | 8 ++------
1 file changed, 2 insertions(+), 6 deletions(-)
diff --git a/cpp/src/arrow/compute/kernels/aggregate_basic.cc b/cpp/src/arrow/compute/kernels/aggregate_basic.cc
index 0cd723cbc75..c1aebd4cd65 100644
--- a/cpp/src/arrow/compute/kernels/aggregate_basic.cc
+++ b/cpp/src/arrow/compute/kernels/aggregate_basic.cc
@@ -429,14 +429,10 @@ void AddMinMaxKernels(KernelInit init,
const std::vector>& types,
ScalarAggregateFunction* func, SimdLevel::type simd_level) {
for (const auto& ty : types) {
- // array[T] -> scalar[struct]
+ // any[T] -> scalar[struct]
auto out_ty = struct_({field("min", ty), field("max", ty)});
- auto sig = KernelSignature::Make({InputType::Array(ty)}, ValueDescr::Scalar(out_ty));
+ auto sig = KernelSignature::Make({InputType(ty)}, ValueDescr::Scalar(out_ty));
AddAggKernel(std::move(sig), init, func, simd_level);
-
- // scalar[InT] -> scalar[struct]
- sig = KernelSignature::Make({InputType::Scalar(ty)}, ValueDescr::Scalar(out_ty));
- AddAggKernel(std::move(sig), init, func, SimdLevel::NONE);
}
}
From c25c2b9a8b46d00cd3188083081ebc3fda7d6664 Mon Sep 17 00:00:00 2001
From: David Li
Date: Thu, 15 Jul 2021 14:53:49 -0400
Subject: [PATCH 07/10] ARROW-9056: [C++] Account for length in count
aggregation
---
cpp/src/arrow/compute/exec/plan_test.cc | 24 +++++++++++++++++++
.../arrow/compute/kernels/aggregate_basic.cc | 4 ++--
2 files changed, 26 insertions(+), 2 deletions(-)
diff --git a/cpp/src/arrow/compute/exec/plan_test.cc b/cpp/src/arrow/compute/exec/plan_test.cc
index 7d412e67c5c..b495ed0f7c4 100644
--- a/cpp/src/arrow/compute/exec/plan_test.cc
+++ b/cpp/src/arrow/compute/exec/plan_test.cc
@@ -447,5 +447,29 @@ TEST(ExecPlanExecution, SourceScalarAggSink) {
}))));
}
+TEST(ExecPlanExecution, ScalarSourceScalarAggSink) {
+ ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
+
+ BatchesWithSchema basic_data;
+ basic_data.batches = {
+ ExecBatchFromJSON({ValueDescr::Scalar(int32())}, "[[5], [5], [5]]"),
+ ExecBatchFromJSON({int32()}, "[[5], [6], [7]]")};
+ basic_data.schema = schema({field("i32", int32())});
+
+ ASSERT_OK_AND_ASSIGN(auto source,
+ MakeTestSourceNode(plan.get(), "source", basic_data,
+ /*parallel=*/false, /*slow=*/false));
+
+ ASSERT_OK_AND_ASSIGN(auto scalar_agg, MakeScalarAggregateNode(source, "scalar_agg",
+ {{"count", nullptr}}));
+
+ auto sink_gen = MakeSinkNode(scalar_agg, "sink");
+
+ ASSERT_THAT(StartAndCollect(plan.get(), sink_gen),
+ Finishes(ResultWith(UnorderedElementsAreArray({
+ ExecBatchFromJSON({ValueDescr::Scalar(int64())}, "[[6]]"),
+ }))));
+}
+
} // namespace compute
} // namespace arrow
diff --git a/cpp/src/arrow/compute/kernels/aggregate_basic.cc b/cpp/src/arrow/compute/kernels/aggregate_basic.cc
index c1aebd4cd65..a7df66695b2 100644
--- a/cpp/src/arrow/compute/kernels/aggregate_basic.cc
+++ b/cpp/src/arrow/compute/kernels/aggregate_basic.cc
@@ -66,8 +66,8 @@ struct CountImpl : public ScalarAggregator {
this->non_nulls += input.length - nulls;
} else {
const Scalar& input = *batch[0].scalar();
- this->nulls += !input.is_valid;
- this->non_nulls += input.is_valid;
+ this->nulls += !input.is_valid * batch.length;
+ this->non_nulls += input.is_valid * batch.length;
}
return Status::OK();
}
From 01aa31969ef63253929be045a76e157f65e7b42d Mon Sep 17 00:00:00 2001
From: David Li
Date: Thu, 15 Jul 2021 16:23:41 -0400
Subject: [PATCH 08/10] Apply suggestions from code review
Co-authored-by: Benjamin Kietzman
---
cpp/src/arrow/compute/kernels/aggregate_basic_internal.h | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
diff --git a/cpp/src/arrow/compute/kernels/aggregate_basic_internal.h b/cpp/src/arrow/compute/kernels/aggregate_basic_internal.h
index 98cb41aac6a..bb1d53c02ac 100644
--- a/cpp/src/arrow/compute/kernels/aggregate_basic_internal.h
+++ b/cpp/src/arrow/compute/kernels/aggregate_basic_internal.h
@@ -61,7 +61,7 @@ struct SumImpl : public ScalarAggregator {
Status Consume(KernelContext*, const ExecBatch& batch) override {
if (batch[0].is_array()) {
const auto& data = batch[0].array();
- this->count = data->length - data->GetNullCount();
+ this->count += data->length - data->GetNullCount();
if (is_boolean_type::value) {
this->sum +=
static_cast(BooleanArray(data).true_count());
@@ -71,9 +71,9 @@ struct SumImpl : public ScalarAggregator {
}
} else {
const auto& data = *batch[0].scalar();
- this->count = data.is_valid;
+ this->count += data.is_valid * batch.length;
if (data.is_valid) {
- this->sum += internal::UnboxScalar::Unbox(data);
+ this->sum += internal::UnboxScalar::Unbox(data) * batch.length;
}
}
return Status::OK();
From 11ede34ed801258aa9038f6aba9233bab517a1fb Mon Sep 17 00:00:00 2001
From: David Li
Date: Thu, 15 Jul 2021 16:28:39 -0400
Subject: [PATCH 09/10] ARROW-9056: [C++] Test mean/sum in scalar aggregation
---
cpp/src/arrow/compute/exec/plan_test.cc | 27 ++++++++++++++++---------
1 file changed, 18 insertions(+), 9 deletions(-)
diff --git a/cpp/src/arrow/compute/exec/plan_test.cc b/cpp/src/arrow/compute/exec/plan_test.cc
index b495ed0f7c4..bcb63c25b3a 100644
--- a/cpp/src/arrow/compute/exec/plan_test.cc
+++ b/cpp/src/arrow/compute/exec/plan_test.cc
@@ -452,23 +452,32 @@ TEST(ExecPlanExecution, ScalarSourceScalarAggSink) {
BatchesWithSchema basic_data;
basic_data.batches = {
- ExecBatchFromJSON({ValueDescr::Scalar(int32())}, "[[5], [5], [5]]"),
- ExecBatchFromJSON({int32()}, "[[5], [6], [7]]")};
- basic_data.schema = schema({field("i32", int32())});
+ ExecBatchFromJSON({ValueDescr::Scalar(int32()), ValueDescr::Scalar(int32()),
+ ValueDescr::Scalar(int32())},
+ "[[5, 5, 5], [5, 5, 5], [5, 5, 5]]"),
+ ExecBatchFromJSON({int32(), int32(), int32()},
+ "[[5, 5, 5], [6, 6, 6], [7, 7, 7]]")};
+ basic_data.schema =
+ schema({field("a", int32()), field("b", int32()), field("c", int32())});
ASSERT_OK_AND_ASSIGN(auto source,
MakeTestSourceNode(plan.get(), "source", basic_data,
/*parallel=*/false, /*slow=*/false));
- ASSERT_OK_AND_ASSIGN(auto scalar_agg, MakeScalarAggregateNode(source, "scalar_agg",
- {{"count", nullptr}}));
+ ASSERT_OK_AND_ASSIGN(
+ auto scalar_agg,
+ MakeScalarAggregateNode(source, "scalar_agg",
+ {{"count", nullptr}, {"sum", nullptr}, {"mean", nullptr}}));
auto sink_gen = MakeSinkNode(scalar_agg, "sink");
- ASSERT_THAT(StartAndCollect(plan.get(), sink_gen),
- Finishes(ResultWith(UnorderedElementsAreArray({
- ExecBatchFromJSON({ValueDescr::Scalar(int64())}, "[[6]]"),
- }))));
+ ASSERT_THAT(
+ StartAndCollect(plan.get(), sink_gen),
+ Finishes(ResultWith(UnorderedElementsAreArray({
+ ExecBatchFromJSON({ValueDescr::Scalar(int64()), ValueDescr::Scalar(int64()),
+ ValueDescr::Scalar(float64())},
+ "[[6, 33, 5.5]]"),
+ }))));
}
} // namespace compute
From 3d9d8be4ced7fcb302d973b2bd6799578139ef84 Mon Sep 17 00:00:00 2001
From: David Li
Date: Fri, 16 Jul 2021 08:43:49 -0400
Subject: [PATCH 10/10] ARROW-9056: [C++] Make MSVC happier
---
cpp/src/arrow/scalar.cc | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/cpp/src/arrow/scalar.cc b/cpp/src/arrow/scalar.cc
index c38007a1986..cb7755ba3f1 100644
--- a/cpp/src/arrow/scalar.cc
+++ b/cpp/src/arrow/scalar.cc
@@ -566,7 +566,7 @@ Status CastImpl(const Decimal256Scalar& from, StringScalar* to) {
Status CastImpl(const StructScalar& from, StringScalar* to) {
std::stringstream ss;
ss << '{';
- for (size_t i = 0; i < from.value.size(); i++) {
+ for (int i = 0; static_cast(i) < from.value.size(); i++) {
if (i > 0) ss << ", ";
ss << from.type->field(i)->name() << ':' << from.type->field(i)->type()->ToString()
<< " = " << from.value[i]->ToString();