Skip to content
2 changes: 1 addition & 1 deletion c_glib/test/test-struct-scalar.rb
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def test_equal
end

def test_to_s
assert_equal("...", @scalar.to_s)
assert_equal("{score:int8 = -29, enabled:bool = true}", @scalar.to_s)
end

def test_value
Expand Down
33 changes: 33 additions & 0 deletions cpp/src/arrow/compute/exec/plan_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -447,5 +447,38 @@ TEST(ExecPlanExecution, SourceScalarAggSink) {
}))));
}

TEST(ExecPlanExecution, ScalarSourceScalarAggSink) {
ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());

BatchesWithSchema basic_data;
basic_data.batches = {
ExecBatchFromJSON({ValueDescr::Scalar(int32()), ValueDescr::Scalar(int32()),
ValueDescr::Scalar(int32())},
"[[5, 5, 5], [5, 5, 5], [5, 5, 5]]"),
ExecBatchFromJSON({int32(), int32(), int32()},
"[[5, 5, 5], [6, 6, 6], [7, 7, 7]]")};
basic_data.schema =
schema({field("a", int32()), field("b", int32()), field("c", int32())});

ASSERT_OK_AND_ASSIGN(auto source,
MakeTestSourceNode(plan.get(), "source", basic_data,
/*parallel=*/false, /*slow=*/false));

ASSERT_OK_AND_ASSIGN(
auto scalar_agg,
MakeScalarAggregateNode(source, "scalar_agg",
{{"count", nullptr}, {"sum", nullptr}, {"mean", nullptr}}));

auto sink_gen = MakeSinkNode(scalar_agg, "sink");

ASSERT_THAT(
StartAndCollect(plan.get(), sink_gen),
Finishes(ResultWith(UnorderedElementsAreArray({
ExecBatchFromJSON({ValueDescr::Scalar(int64()), ValueDescr::Scalar(int64()),
ValueDescr::Scalar(float64())},
"[[6, 33, 5.5]]"),
}))));
}

} // namespace compute
} // namespace arrow
81 changes: 63 additions & 18 deletions cpp/src/arrow/compute/kernels/aggregate_basic.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,16 @@ struct CountImpl : public ScalarAggregator {
explicit CountImpl(ScalarAggregateOptions options) : options(std::move(options)) {}

Status Consume(KernelContext*, const ExecBatch& batch) override {
const ArrayData& input = *batch[0].array();
const int64_t nulls = input.GetNullCount();
this->nulls += nulls;
this->non_nulls += input.length - nulls;
if (batch[0].is_array()) {
const ArrayData& input = *batch[0].array();
const int64_t nulls = input.GetNullCount();
this->nulls += nulls;
this->non_nulls += input.length - nulls;
} else {
const Scalar& input = *batch[0].scalar();
this->nulls += !input.is_valid * batch.length;
this->non_nulls += input.is_valid * batch.length;
}
return Status::OK();
}

Expand Down Expand Up @@ -149,6 +155,12 @@ struct BooleanAnyImpl : public ScalarAggregator {
if (this->any == true) {
return Status::OK();
}
if (batch[0].is_scalar()) {
const auto& scalar = *batch[0].scalar();
this->has_nulls = !scalar.is_valid;
this->any = scalar.is_valid && checked_cast<const BooleanScalar&>(scalar).value;
return Status::OK();
}
const auto& data = *batch[0].array();
this->has_nulls = data.GetNullCount() > 0;
arrow::internal::OptionalBinaryBitBlockCounter counter(
Expand Down Expand Up @@ -208,6 +220,12 @@ struct BooleanAllImpl : public ScalarAggregator {
if (!options.skip_nulls && this->has_nulls) {
return Status::OK();
}
if (batch[0].is_scalar()) {
const auto& scalar = *batch[0].scalar();
this->has_nulls = !scalar.is_valid;
this->all = !scalar.is_valid || checked_cast<const BooleanScalar&>(scalar).value;
return Status::OK();
}
const auto& data = *batch[0].array();
this->has_nulls = data.GetNullCount() > 0;
arrow::internal::OptionalBinaryBitBlockCounter counter(
Expand Down Expand Up @@ -387,13 +405,33 @@ void AddBasicAggKernels(KernelInit init,
}
}

void AddScalarAggKernels(KernelInit init,
const std::vector<std::shared_ptr<DataType>>& types,
std::shared_ptr<DataType> out_ty,
ScalarAggregateFunction* func) {
for (const auto& ty : types) {
// scalar[InT] -> scalar[OutT]
auto sig = KernelSignature::Make({InputType::Scalar(ty)}, ValueDescr::Scalar(out_ty));
AddAggKernel(std::move(sig), init, func, SimdLevel::NONE);
}
}

void AddArrayScalarAggKernels(KernelInit init,
const std::vector<std::shared_ptr<DataType>>& types,
std::shared_ptr<DataType> out_ty,
ScalarAggregateFunction* func,
SimdLevel::type simd_level = SimdLevel::NONE) {
AddBasicAggKernels(init, types, out_ty, func, simd_level);
AddScalarAggKernels(init, types, out_ty, func);
}

void AddMinMaxKernels(KernelInit init,
const std::vector<std::shared_ptr<DataType>>& types,
ScalarAggregateFunction* func, SimdLevel::type simd_level) {
for (const auto& ty : types) {
// array[T] -> scalar[struct<min: T, max: T>]
// any[T] -> scalar[struct<min: T, max: T>]
auto out_ty = struct_({field("min", ty), field("max", ty)});
auto sig = KernelSignature::Make({InputType::Array(ty)}, ValueDescr::Scalar(out_ty));
auto sig = KernelSignature::Make({InputType(ty)}, ValueDescr::Scalar(out_ty));
AddAggKernel(std::move(sig), init, func, simd_level);
}
}
Expand Down Expand Up @@ -468,17 +506,21 @@ void RegisterScalarAggregateBasic(FunctionRegistry* registry) {
InputType any_array(ValueDescr::ARRAY);
AddAggKernel(KernelSignature::Make({any_array}, ValueDescr::Scalar(int64())),
aggregate::CountInit, func.get());
AddAggKernel(
KernelSignature::Make({InputType(ValueDescr::SCALAR)}, ValueDescr::Scalar(int64())),
aggregate::CountInit, func.get());
DCHECK_OK(registry->AddFunction(std::move(func)));

func = std::make_shared<ScalarAggregateFunction>("sum", Arity::Unary(), &sum_doc,
&default_scalar_aggregate_options);
aggregate::AddBasicAggKernels(aggregate::SumInit, {boolean()}, int64(), func.get());
aggregate::AddBasicAggKernels(aggregate::SumInit, SignedIntTypes(), int64(),
func.get());
aggregate::AddBasicAggKernels(aggregate::SumInit, UnsignedIntTypes(), uint64(),
func.get());
aggregate::AddBasicAggKernels(aggregate::SumInit, FloatingPointTypes(), float64(),
func.get());
aggregate::AddArrayScalarAggKernels(aggregate::SumInit, {boolean()}, int64(),
func.get());
aggregate::AddArrayScalarAggKernels(aggregate::SumInit, SignedIntTypes(), int64(),
func.get());
aggregate::AddArrayScalarAggKernels(aggregate::SumInit, UnsignedIntTypes(), uint64(),
func.get());
aggregate::AddArrayScalarAggKernels(aggregate::SumInit, FloatingPointTypes(), float64(),
func.get());
// Add the SIMD variants for sum
#if defined(ARROW_HAVE_RUNTIME_AVX2) || defined(ARROW_HAVE_RUNTIME_AVX512)
auto cpu_info = arrow::internal::CpuInfo::GetInstance();
Expand All @@ -497,9 +539,10 @@ void RegisterScalarAggregateBasic(FunctionRegistry* registry) {

func = std::make_shared<ScalarAggregateFunction>("mean", Arity::Unary(), &mean_doc,
&default_scalar_aggregate_options);
aggregate::AddBasicAggKernels(aggregate::MeanInit, {boolean()}, float64(), func.get());
aggregate::AddBasicAggKernels(aggregate::MeanInit, NumericTypes(), float64(),
func.get());
aggregate::AddArrayScalarAggKernels(aggregate::MeanInit, {boolean()}, float64(),
func.get());
aggregate::AddArrayScalarAggKernels(aggregate::MeanInit, NumericTypes(), float64(),
func.get());
// Add the SIMD variants for mean
#if defined(ARROW_HAVE_RUNTIME_AVX2)
if (cpu_info->IsSupported(arrow::internal::CpuInfo::AVX2)) {
Expand Down Expand Up @@ -534,13 +577,15 @@ void RegisterScalarAggregateBasic(FunctionRegistry* registry) {
// any
func = std::make_shared<ScalarAggregateFunction>("any", Arity::Unary(), &any_doc,
&default_scalar_aggregate_options);
aggregate::AddBasicAggKernels(aggregate::AnyInit, {boolean()}, boolean(), func.get());
aggregate::AddArrayScalarAggKernels(aggregate::AnyInit, {boolean()}, boolean(),
func.get());
DCHECK_OK(registry->AddFunction(std::move(func)));

// all
func = std::make_shared<ScalarAggregateFunction>("all", Arity::Unary(), &all_doc,
&default_scalar_aggregate_options);
aggregate::AddBasicAggKernels(aggregate::AllInit, {boolean()}, boolean(), func.get());
aggregate::AddArrayScalarAggKernels(aggregate::AllInit, {boolean()}, boolean(),
func.get());
DCHECK_OK(registry->AddFunction(std::move(func)));

// index
Expand Down
65 changes: 58 additions & 7 deletions cpp/src/arrow/compute/kernels/aggregate_basic_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,22 @@ struct SumImpl : public ScalarAggregator {
using OutputType = typename TypeTraits<SumType>::ScalarType;

Status Consume(KernelContext*, const ExecBatch& batch) override {
const auto& data = batch[0].array();
this->count = data->length - data->GetNullCount();
if (is_boolean_type<ArrowType>::value) {
this->sum += static_cast<typename SumType::c_type>(BooleanArray(data).true_count());
if (batch[0].is_array()) {
const auto& data = batch[0].array();
this->count += data->length - data->GetNullCount();
if (is_boolean_type<ArrowType>::value) {
this->sum +=
static_cast<typename SumType::c_type>(BooleanArray(data).true_count());
} else {
this->sum +=
arrow::compute::detail::SumArray<CType, typename SumType::c_type>(*data);
}
} else {
this->sum +=
arrow::compute::detail::SumArray<CType, typename SumType::c_type>(*data);
const auto& data = *batch[0].scalar();
this->count += data.is_valid * batch.length;
if (data.is_valid) {
this->sum += internal::UnboxScalar<ArrowType>::Unbox(data) * batch.length;
}
}
return Status::OK();
}
Expand Down Expand Up @@ -228,9 +237,29 @@ struct MinMaxImpl : public ScalarAggregator {
: out_type(out_type), options(options) {}

Status Consume(KernelContext*, const ExecBatch& batch) override {
if (batch[0].is_array()) {
return ConsumeArray(ArrayType(batch[0].array()));
}
return ConsumeScalar(*batch[0].scalar());
}

Status ConsumeScalar(const Scalar& scalar) {
StateType local;
local.has_nulls = !scalar.is_valid;
local.has_values = scalar.is_valid;

ArrayType arr(batch[0].array());
if (local.has_nulls && !options.skip_nulls) {
this->state = local;
return Status::OK();
}

local.MergeOne(internal::UnboxScalar<ArrowType>::Unbox(scalar));
this->state = local;
return Status::OK();
}

Status ConsumeArray(const ArrayType& arr) {
StateType local;

const auto null_count = arr.null_count();
local.has_nulls = null_count > 0;
Expand Down Expand Up @@ -344,6 +373,9 @@ struct BooleanMinMaxImpl : public MinMaxImpl<BooleanType, SimdLevel> {
using MinMaxImpl<BooleanType, SimdLevel>::options;

Status Consume(KernelContext*, const ExecBatch& batch) override {
if (ARROW_PREDICT_FALSE(batch[0].is_scalar())) {
return ConsumeScalar(checked_cast<const BooleanScalar&>(*batch[0].scalar()));
}
StateType local;
ArrayType arr(batch[0].array());

Expand All @@ -366,6 +398,25 @@ struct BooleanMinMaxImpl : public MinMaxImpl<BooleanType, SimdLevel> {
this->state = local;
return Status::OK();
}

Status ConsumeScalar(const BooleanScalar& scalar) {
StateType local;

local.has_nulls = !scalar.is_valid;
local.has_values = scalar.is_valid;
if (local.has_nulls && !options.skip_nulls) {
this->state = local;
return Status::OK();
}

const int true_count = scalar.is_valid && scalar.value;
const int false_count = scalar.is_valid && !scalar.value;
local.max = true_count > 0;
local.min = false_count == 0;

this->state = local;
return Status::OK();
}
};

template <SimdLevel::type SimdLevel>
Expand Down
24 changes: 23 additions & 1 deletion cpp/src/arrow/compute/kernels/aggregate_mode.cc
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,24 @@ struct Moder<InType, enable_if_t<is_floating_type<InType>::value>> {
SortModer<InType> impl;
};

template <typename T>
Status ScalarMode(KernelContext* ctx, const Scalar& scalar, Datum* out) {
using CType = typename T::c_type;
if (scalar.is_valid) {
bool called = false;
return Finalize<T>(ctx, out, [&]() {
if (!called) {
called = true;
return std::pair<CType, uint64_t>(UnboxScalar<T>::Unbox(scalar), 1);
}
return std::pair<CType, uint64_t>(static_cast<CType>(0), kCountEOF);
});
}
return Finalize<T>(ctx, out, []() {
return std::pair<CType, uint64_t>(static_cast<CType>(0), kCountEOF);
});
}

template <typename _, typename InType>
struct ModeExecutor {
static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
Expand All @@ -313,6 +331,10 @@ struct ModeExecutor {
return Status::Invalid("ModeOption::n must be strictly positive");
}

if (batch[0].is_scalar()) {
return ScalarMode<InType>(ctx, *batch[0].scalar(), out);
}

return Moder<InType>().impl.Exec(ctx, batch, out);
}
};
Expand All @@ -325,7 +347,7 @@ VectorKernel NewModeKernel(const std::shared_ptr<DataType>& in_type) {
auto out_type =
struct_({field(kModeFieldName, in_type), field(kCountFieldName, int64())});
kernel.signature =
KernelSignature::Make({InputType::Array(in_type)}, ValueDescr::Array(out_type));
KernelSignature::Make({InputType(in_type)}, ValueDescr::Array(out_type));
return kernel;
}

Expand Down
37 changes: 35 additions & 2 deletions cpp/src/arrow/compute/kernels/aggregate_quantile.cc
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,36 @@ struct ExactQuantiler<InType, enable_if_t<is_floating_type<InType>::value>> {
SortQuantiler<InType> impl;
};

template <typename T>
Status ScalarQuantile(KernelContext* ctx, const QuantileOptions& options,
const Scalar& scalar, Datum* out) {
using CType = typename T::c_type;
ArrayData* output = out->mutable_array();
if (!scalar.is_valid) {
output->length = 0;
output->null_count = 0;
return Status::OK();
}
auto out_type = IsDataPoint(options) ? scalar.type : float64();
output->length = options.q.size();
output->null_count = 0;
ARROW_ASSIGN_OR_RAISE(
output->buffers[1],
ctx->Allocate(output->length * BitUtil::BytesForBits(GetBitWidth(*out_type))));
if (IsDataPoint(options)) {
CType* out_buffer = output->template GetMutableValues<CType>(1);
for (int64_t i = 0; i < output->length; i++) {
out_buffer[i] = UnboxScalar<T>::Unbox(scalar);
}
} else {
double* out_buffer = output->template GetMutableValues<double>(1);
for (int64_t i = 0; i < output->length; i++) {
out_buffer[i] = static_cast<double>(UnboxScalar<T>::Unbox(scalar));
}
}
return Status::OK();
}

template <typename _, typename InType>
struct QuantileExecutor {
static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
Expand All @@ -406,6 +436,10 @@ struct QuantileExecutor {
}
}

if (batch[0].is_scalar()) {
return ScalarQuantile<InType>(ctx, options, *batch[0].scalar(), out);
}

return ExactQuantiler<InType>().impl.Exec(ctx, batch, out);
}
};
Expand All @@ -427,8 +461,7 @@ void AddQuantileKernels(VectorFunction* func) {
base.output_chunked = false;

for (const auto& ty : NumericTypes()) {
base.signature =
KernelSignature::Make({InputType::Array(ty)}, OutputType(ResolveOutput));
base.signature = KernelSignature::Make({InputType(ty)}, OutputType(ResolveOutput));
// output type is determined at runtime, set template argument to nulltype
base.exec = GenerateNumeric<QuantileExecutor, NullType>(*ty);
DCHECK_OK(func->AddKernel(base));
Expand Down
Loading